Documentation ¶
Overview ¶
Package streamcache implements an in-memory cache mechanism that allows multiple callers to read some or all of the contents of a source reader, while only reading from the source reader once; when there's only one final reader remaining, the cache is discarded and the final reader reads directly from the source.
Let's say we're reading from stdin. For example:
$ cat myfile.ext | myprogram
In this scenario, myprogram wants to detect the type of data in the file/pipe, and then print it out. That sampling could be done in a separate goroutine per sampler type. The input file could be, let's say, a CSV file, or a JSON file.
The obvious approach is to inspect the first few lines of the input, and check if the input is either valid CSV, or valid JSON. After that process, let's say we want to dump out the entire contents of the input.
Package streamcache provides a facility to create a caching Stream from an underlying io.Reader (os.Stdin in this scenario), and spawn multiple readers, each of which can operate independently, in their own goroutines if desired. The underlying source (again, os.Stdin in this scenario) will only be read from once, but its data is available to multiple readers, because that data is cached in memory.
That is, until after Stream.Seal is invoked: when there's only one final reader left, the cache is discarded, and the final reader reads directly from the underlying source.
The entrypoint to this package is streamcache.New, which returns a new Stream instance, from which readers can be created via Stream.NewReader.
Index ¶
- Variables
- type Reader
- type Stream
- func (s *Stream) Done() <-chan struct{}
- func (s *Stream) Err() error
- func (s *Stream) Filled() <-chan struct{}
- func (s *Stream) NewReader(ctx context.Context) *Reader
- func (s *Stream) Seal()
- func (s *Stream) Sealed() bool
- func (s *Stream) Size() int
- func (s *Stream) Source() io.Reader
- func (s *Stream) Total(ctx context.Context) (size int, err error)
Constants ¶
This section is empty.
Variables ¶
var ErrAlreadyClosed = errors.New("reader is already closed")
ErrAlreadyClosed is returned by Reader.Read if the reader is already closed.
Functions ¶
This section is empty.
Types ¶
type Reader ¶ added in v0.3.0
type Reader struct {
// contains filtered or unexported fields
}
Reader is returned by Stream.NewReader. It implements io.ReadCloser; the caller must close the Reader when finished with it.
func (*Reader) Close ¶ added in v0.3.0
Close closes this Reader. If the parent Stream is not sealed, this method ultimately returns nil. If the parent Stream is sealed and this is the last remaining reader, the Stream's source reader is closed, if it implements io.Closer. At that point, and the channel returned by Stream.Done is closed.
If you don't want the source to be closed, wrap it via io.NopCloser before passing it to streamcache.New.
The Close operation proceeds even if the non-nil context provided to Stream.NewReader is cancelled. That is to say, Reader.Close ignores context.
Note that subsequent calls to Close are no-op and return the same result as the first call.
func (*Reader) Read ¶ added in v0.3.0
Read reads from the stream. If a non-nil context was provided to Stream.NewReader to create this Reader, that context is checked at the start of each call to Read (and possibly at other checkpoints): if the context has been canceled, Read will return the context's error via context.Cause. Note however that Read can still block on reading from the Stream source. If this reader has already been closed via Reader.Close, Read will return ErrAlreadyClosed. If a previous invocation of Read returned an error from the source, that error is returned.
Otherwise Read reads from Stream, which may return bytes from Stream's cache or new bytes from the source, or a combination of both. Note in particular that Read preferentially returns available bytes from the cache rather than waiting to read from the source, even if that means the returned n < len(p). This is in line with the io.Reader convention:
If some data is available but not len(p) bytes, Read conventionally returns what is available instead of waiting for more.
Use io.ReadFull or io.ReadAtLeast if you want to ensure that p is filled.
Read is not safe for concurrent use.
type Stream ¶ added in v0.3.0
type Stream struct {
// contains filtered or unexported fields
}
Stream mediates access to the bytes of an underlying source io.Reader. Multiple callers can invoke Stream.NewReader to obtain a Reader, each of which can read the full or partial contents of the source reader. Note that the source is only read from once, and the returned bytes are cached in memory. After Stream.Seal is invoked and readers are closed, the final reader discards the cache and reads directly from the source for the remaining bytes.
func New ¶ added in v0.3.0
New returns a new Stream that wraps src. Use Stream.NewReader to read from src.
func (*Stream) Done ¶ added in v0.3.0
func (s *Stream) Done() <-chan struct{}
Done returns a channel that is closed when the Stream is sealed and all remaining readers are closed.
s.Seal() select { case <-s.Done(): fmt.Println("All readers are closed") if err := s.Err(); err != nil { fmt.Println("But an error occurred:", err) } default: fmt.Println("The stream still is being read from") }
IMPORTANT: Don't wait on the Done channel without also calling Stream.Seal, as you may end up in deadlock. The returned channel will never be closed unless Stream.Seal is invoked.
Note that Stream.Err returning a non-nil value does not of itself indicate that all readers are closed. There could be other readers still consuming earlier parts of the cache.
Note also that it's possible that even after the returned channel is closed, Stream may not have closed its underlying source reader. For example, if a Stream is created and immediately sealed, the channel returned by Done is closed, although the underlying source reader was never closed. The source reader is closed only by closing the final Reader instance that was active after Seal is invoked.
See also: Stream.Filled.
func (*Stream) Err ¶ added in v0.3.0
Err returns the first error (if any) that was returned by the underlying source reader, which may be io.EOF. After the source reader returns an error, it is never read from again, and the channel returned by Stream.Filled is closed.
func (*Stream) Filled ¶ added in v0.3.0
func (s *Stream) Filled() <-chan struct{}
Filled returns a channel that is closed when the underlying source reader returns an error, including io.EOF. If the source reader returns an error, it is never read from again. If the source reader does not return an error, this channel is never closed.
See also: Stream.Done.
func (*Stream) NewReader ¶ added in v0.3.0
NewReader returns a new Reader from Stream. If ctx is non-nil, it is checked for cancellation at the start of Reader.Read (and possibly at other checkpoints).
It is the caller's responsibility to close the returned Reader.
NewReader panics if s is already sealed via Stream.Seal (but note that you can first test via Stream.Sealed).
See: Reader.Read, Reader.Close.
func (*Stream) Seal ¶ added in v0.3.0
func (s *Stream) Seal()
Seal is called to indicate that no more calls to NewReader are permitted. If there are no unclosed readers when Seal is invoked, the Stream.Done channel is closed, and the Stream is considered finished. Subsequent invocations are no-op.
func (*Stream) Size ¶ added in v0.3.0
Size returns the current count of bytes read from the source reader. This value increases as readers read from the Stream.
See also: Stream.Total.
func (*Stream) Source ¶ added in v0.3.3
Source returns the Stream's underlying source io.Reader.
This can be useful if you need to force close the source for some reason, e.g.
stream.Source().(io.Closer).Close()
The Stream's behavior is undefined if the caller reads from the source directly.
func (*Stream) Total ¶ added in v0.3.0
Total blocks until the source reader is fully read, and returns the total number of bytes read from the source, and any read error other than io.EOF returned by the source. If ctx is cancelled, zero and the context's cause error (per context.Cause) are returned. If source returned a non-EOF error, that error and the total number of bytes read are returned.
Note that Total only returns if the channel returned by Stream.Filled is closed, but Total can return even if Stream.Done is not closed. That is to say, Total returning does not necessarily mean that all readers are closed.
See also: Stream.Size, Stream.Err, Stream.Filled, Stream.Done.
Directories ¶
Path | Synopsis |
---|---|
examples
|
|
in-out-err
Package main contains the "in-out-err" example program, which reads from stdin and writes to both stdout and stderr.
|
Package main contains the "in-out-err" example program, which reads from stdin and writes to both stdout and stderr. |
multicase
Package main provides the "multicase" example CLI that reads from stdin and outputs each line in lower, upper, and title case.
|
Package main provides the "multicase" example CLI that reads from stdin and outputs each line in lower, upper, and title case. |
typedetect
Package main provides the "typedetect" example CLI that detects the type of a data file, e.g.
|
Package main provides the "typedetect" example CLI that detects the type of a data file, e.g. |