streamcache

package module
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: MIT Imports: 5 Imported by: 1

README

streamcache: in-memory caching stream reader

Go Reference Go Report Card License Workflow

Package streamcache implements a Go in-memory byte cache mechanism that allows multiple callers to read some or all of the contents of a source io.Reader, while only reading from the source reader once. When only the final reader remains, the cache is discarded and the final reader reads directly from the source. This is particularly useful for scenarios where multiple readers may wish to sample the start of a stream, but only one reader will read the entire stream.

Let's say we have a program typedetect, and we're reading from stdin. For example:

$ cat myfile.ext | typedetect  

In this scenario, typedetect wants to detect and print the type of data in the file/pipe, and then print the contents. That detection sampling could be done in a separate goroutine per sampler type. The input file could be, let's say, a JSON file, or an XML file.

The obvious approach is to inspect the first few tokens of the input, and check if the tokens are either valid JSON or valid XML. After that process, let's say we want to dump out a preview of the file contents.

Package streamcache provides a facility to create a Stream from an underlying io.Reader (os.Stdin in this scenario), and spawn multiple readers, each of which can operate independently, in their own goroutines if desired. The underlying source (again, os.Stdin in this scenario) will only once be read from, but its data is available to multiple readers, because that data is cached in memory.

That is, until there's only one final reader left, (after invoking Stream.Seal), at which point the cache is discarded, and the final Reader reads directly from the underlying source.

Usage

Add to your go.mod via go get:

go get github.com/neilotoole/streamcache

Here's a simple example that copies the contents of stdin to stdout and stderr, and prints the number of bytes read.

package main

import (
    "context"
    "errors"
    "fmt"
    "io"
    "os"

    "github.com/neilotoole/streamcache"
)

// Write stdin to both stdout and stderr.
// Some error handling omitted for brevity.
func main() {
    ctx := context.Background()
    stream := streamcache.New(os.Stdin)

    r1 := stream.NewReader(ctx)
    go func() {
        defer r1.Close()
        io.Copy(os.Stdout, r1)
    }()

    r2 := stream.NewReader(ctx)
    go func() {
        defer r2.Close()
        io.Copy(os.Stderr, r2)
    }()
    
    stream.Seal()   // Indicate that there'll be no more readers...
    <-stream.Done() // Receives when all readers are closed.

    if err := stream.Err(); err != nil && !errors.Is(err, io.EOF) {
        fmt.Fprintln(os.Stderr, "error:", err)
        os.Exit(1)
    }

    fmt.Fprintf(os.Stdout, "Read %d bytes from stdin\n", stream.Size())
}

Executing the above program:

$ go install github.com/neilotoole/streamcache/examples/in-out-err
$ echo "hello world" | in-out-err
hello world
hello world
Read 12 bytes from stdin

Examples

  • in-out-err: copy stdin to both stdout and stderr.
  • typedetect: detect the type of input data, and print the head and tail of the contents. streamcache_typedetect.png
  • multicase: transform each line of input to upper, lower, and title case. streamcache_multicase.png

Documentation

Overview

Package streamcache implements an in-memory cache mechanism that allows multiple callers to read some or all of the contents of a source reader, while only reading from the source reader once; when there's only one final reader remaining, the cache is discarded and the final reader reads directly from the source.

Let's say we're reading from stdin. For example:

$ cat myfile.ext | myprogram

In this scenario, myprogram wants to detect the type of data in the file/pipe, and then print it out. That sampling could be done in a separate goroutine per sampler type. The input file could be, let's say, a CSV file, or a JSON file.

The obvious approach is to inspect the first few lines of the input, and check if the input is either valid CSV, or valid JSON. After that process, let's say we want to dump out the entire contents of the input.

Package streamcache provides a facility to create a caching Stream from an underlying io.Reader (os.Stdin in this scenario), and spawn multiple readers, each of which can operate independently, in their own goroutines if desired. The underlying source (again, os.Stdin in this scenario) will only be read from once, but its data is available to multiple readers, because that data is cached in memory.

That is, until after Stream.Seal is invoked: when there's only one final reader left, the cache is discarded, and the final reader reads directly from the underlying source.

The entrypoint to this package is streamcache.New, which returns a new Stream instance, from which readers can be created via Stream.NewReader.

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyClosed = errors.New("reader is already closed")

ErrAlreadyClosed is returned by Reader.Read if the reader is already closed.

Functions

This section is empty.

Types

type Reader added in v0.3.0

type Reader struct {
	// contains filtered or unexported fields
}

Reader is returned by Stream.NewReader. It implements io.ReadCloser; the caller must close the Reader when finished with it.

func (*Reader) Close added in v0.3.0

func (r *Reader) Close() error

Close closes this Reader. If the parent Stream is not sealed, this method ultimately returns nil. If the parent Stream is sealed and this is the last remaining reader, the Stream's source reader is closed, if it implements io.Closer. At that point, and the channel returned by Stream.Done is closed.

If you don't want the source to be closed, wrap it via io.NopCloser before passing it to streamcache.New.

The Close operation proceeds even if the non-nil context provided to Stream.NewReader is cancelled. That is to say, Reader.Close ignores context.

Note that subsequent calls to Close are no-op and return the same result as the first call.

func (*Reader) Read added in v0.3.0

func (r *Reader) Read(p []byte) (n int, err error)

Read reads from the stream. If a non-nil context was provided to Stream.NewReader to create this Reader, that context is checked at the start of each call to Read (and possibly at other checkpoints): if the context has been canceled, Read will return the context's error via context.Cause. Note however that Read can still block on reading from the Stream source. If this reader has already been closed via Reader.Close, Read will return ErrAlreadyClosed. If a previous invocation of Read returned an error from the source, that error is returned.

Otherwise Read reads from Stream, which may return bytes from Stream's cache or new bytes from the source, or a combination of both. Note in particular that Read preferentially returns available bytes from the cache rather than waiting to read from the source, even if that means the returned n < len(p). This is in line with the io.Reader convention:

If some data is available but not len(p) bytes, Read conventionally
returns what is available instead of waiting for more.

Use io.ReadFull or io.ReadAtLeast if you want to ensure that p is filled.

Read is not safe for concurrent use.

type Stream added in v0.3.0

type Stream struct {
	// contains filtered or unexported fields
}

Stream mediates access to the bytes of an underlying source io.Reader. Multiple callers can invoke Stream.NewReader to obtain a Reader, each of which can read the full or partial contents of the source reader. Note that the source is only read from once, and the returned bytes are cached in memory. After Stream.Seal is invoked and readers are closed, the final reader discards the cache and reads directly from the source for the remaining bytes.

func New added in v0.3.0

func New(src io.Reader) *Stream

New returns a new Stream that wraps src. Use Stream.NewReader to read from src.

func (*Stream) Done added in v0.3.0

func (s *Stream) Done() <-chan struct{}

Done returns a channel that is closed when the Stream is sealed and all remaining readers are closed.

s.Seal()
select {
case <-s.Done():
  fmt.Println("All readers are closed")
  if err := s.Err(); err != nil {
    fmt.Println("But an error occurred:", err)
  }
default:
  fmt.Println("The stream still is being read from")
}

IMPORTANT: Don't wait on the Done channel without also calling Stream.Seal, as you may end up in deadlock. The returned channel will never be closed unless Stream.Seal is invoked.

Note that Stream.Err returning a non-nil value does not of itself indicate that all readers are closed. There could be other readers still consuming earlier parts of the cache.

Note also that it's possible that even after the returned channel is closed, Stream may not have closed its underlying source reader. For example, if a Stream is created and immediately sealed, the channel returned by Done is closed, although the underlying source reader was never closed. The source reader is closed only by closing the final Reader instance that was active after Seal is invoked.

See also: Stream.Filled.

func (*Stream) Err added in v0.3.0

func (s *Stream) Err() error

Err returns the first error (if any) that was returned by the underlying source reader, which may be io.EOF. After the source reader returns an error, it is never read from again, and the channel returned by Stream.Filled is closed.

func (*Stream) Filled added in v0.3.0

func (s *Stream) Filled() <-chan struct{}

Filled returns a channel that is closed when the underlying source reader returns an error, including io.EOF. If the source reader returns an error, it is never read from again. If the source reader does not return an error, this channel is never closed.

See also: Stream.Done.

func (*Stream) NewReader added in v0.3.0

func (s *Stream) NewReader(ctx context.Context) *Reader

NewReader returns a new Reader from Stream. If ctx is non-nil, it is checked for cancellation at the start of Reader.Read (and possibly at other checkpoints).

It is the caller's responsibility to close the returned Reader.

NewReader panics if s is already sealed via Stream.Seal (but note that you can first test via Stream.Sealed).

See: Reader.Read, Reader.Close.

func (*Stream) Seal added in v0.3.0

func (s *Stream) Seal()

Seal is called to indicate that no more calls to NewReader are permitted. If there are no unclosed readers when Seal is invoked, the Stream.Done channel is closed, and the Stream is considered finished. Subsequent invocations are no-op.

func (*Stream) Sealed added in v0.3.0

func (s *Stream) Sealed() bool

Sealed returns true if Seal has been invoked.

func (*Stream) Size added in v0.3.0

func (s *Stream) Size() int

Size returns the current count of bytes read from the source reader. This value increases as readers read from the Stream.

See also: Stream.Total.

func (*Stream) Source added in v0.3.3

func (s *Stream) Source() io.Reader

Source returns the Stream's underlying source io.Reader.

This can be useful if you need to force close the source for some reason, e.g.

stream.Source().(io.Closer).Close()

The Stream's behavior is undefined if the caller reads from the source directly.

func (*Stream) Total added in v0.3.0

func (s *Stream) Total(ctx context.Context) (size int, err error)

Total blocks until the source reader is fully read, and returns the total number of bytes read from the source, and any read error other than io.EOF returned by the source. If ctx is cancelled, zero and the context's cause error (per context.Cause) are returned. If source returned a non-EOF error, that error and the total number of bytes read are returned.

Note that Total only returns if the channel returned by Stream.Filled is closed, but Total can return even if Stream.Done is not closed. That is to say, Total returning does not necessarily mean that all readers are closed.

See also: Stream.Size, Stream.Err, Stream.Filled, Stream.Done.

Directories

Path Synopsis
examples
in-out-err
Package main contains the "in-out-err" example program, which reads from stdin and writes to both stdout and stderr.
Package main contains the "in-out-err" example program, which reads from stdin and writes to both stdout and stderr.
multicase
Package main provides the "multicase" example CLI that reads from stdin and outputs each line in lower, upper, and title case.
Package main provides the "multicase" example CLI that reads from stdin and outputs each line in lower, upper, and title case.
typedetect
Package main provides the "typedetect" example CLI that detects the type of a data file, e.g.
Package main provides the "typedetect" example CLI that detects the type of a data file, e.g.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL