Documentation ¶
Overview ¶
Package io provides asynchronous reader and writer implementations.
Next to hashing, the bulk of the time is spent doing input and output operations. For devices to achieve their maximum bandwidth, it's important not to stress them with too many parallel IOPs, which makes it necessary to control the amount of input/output streams. This is the key-feature of the ReadChannelController and WriteChannelController types implemented here.
Similar to io.MultiWriter, there is a ParallelMultiWriter implementation which sends bytes to multiple writers at once.
Index ¶
- Constants
- func DeviceMap(paths []string) [][]string
- func NewUncheckedParallelMultiWriter(writers ...io.Writer) io.Writer
- type BytesVolume
- type ChannelReader
- type ChannelWriter
- type LazyFileWriteCloser
- type ParallelMultiWriter
- func (p *ParallelMultiWriter) AppendWriter(w io.Writer) int
- func (p *ParallelMultiWriter) AutoInsert(w io.Writer) int
- func (p *ParallelMultiWriter) Capacity() int
- func (p *ParallelMultiWriter) SetWriterAtIndex(i int, w io.Writer)
- func (p *ParallelMultiWriter) Write(b []byte) (n int, err error)
- func (p *ParallelMultiWriter) WriterAtIndex(i int) (io.Writer, error)
- type ReadChannelController
- type RootedReadController
- type RootedReadControllers
- type RootedWriteController
- type RootedWriteControllers
- type Stats
- func (s *Stats) BytesDelta(cur, prev uint64, td time.Duration, resultMode bool) string
- func (s *Stats) CopyTo(d *Stats)
- func (s *Stats) DeltaDataString(dataType int8, d *Stats, td time.Duration, sep string) (out string)
- func (s *Stats) Elapsed() time.Duration
- func (s *Stats) InOut(cur uint32) string
- func (s *Stats) IntDelta(cur, prev uint32, td time.Duration, resultMode bool) string
- func (s *Stats) MostFiles() uint32
- type WriteChannelController
- type WriteCloser
Constants ¶
const ( SymbolDelta = "Δ" StatsClientSep = " | " ElapsedData int8 = 1 << iota ReadData WriteData )
const ( SymbolHash = "#" SymbolWallclock = "WC" SymbolTotal = "T" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BytesVolume ¶
type BytesVolume uint64
func (BytesVolume) String ¶
func (b BytesVolume) String() string
func (BytesVolume) StringPad ¶
func (b BytesVolume) StringPad(pad string) string
Convert ourselves into a nice and human readable representation The padding string should be something compatible to the %f format, like "6.2"
type ChannelReader ¶
type ChannelReader struct {
// contains filtered or unexported fields
}
Contains all information about a file or reader to be read
func (*ChannelReader) WriteTo ¶
func (p *ChannelReader) WriteTo(w io.Writer) (n int64, err error)
Allows to use a ChannelReader as source for io.Copy operations This should be preferred as it will save a copy operation WriteTo will block until a Reader is ready to serve us bytes Note that the read operation is performed by N reader routines - we just receive the data and pass it on Also we assume that write blocks until the operation is finished. If you perform non-blocking writes, you must copy the buffer !
type ChannelWriter ¶
type ChannelWriter struct {
// contains filtered or unexported fields
}
Used in conjunction with a WriteChannelController, serving as front-end communicating with the actual writer that resides in a separate go-routine
func (*ChannelWriter) Close ¶
func (c *ChannelWriter) Close() error
func (*ChannelWriter) SetWriter ¶
func (c *ChannelWriter) SetWriter(w io.Writer)
Set our writer to be the given one. Allows to reuse ChannelWriters
func (*ChannelWriter) Write ¶
func (c *ChannelWriter) Write(b []byte) (n int, err error)
Send bytes down our channel and wait for the writer on the end to be done, retrieving the result.
func (*ChannelWriter) Writer ¶
func (c *ChannelWriter) Writer() io.Writer
type LazyFileWriteCloser ¶
type LazyFileWriteCloser struct {
// contains filtered or unexported fields
}
A writer that will create a new file and intermediate directories on first write. You must call the close method to finish the writes and release system resources
func (*LazyFileWriteCloser) Close ¶
func (l *LazyFileWriteCloser) Close() error
Close our writer if it was initialized already. Therefore it's safe to call this even if Write wasn't called beforehand
func (*LazyFileWriteCloser) Path ¶
func (l *LazyFileWriteCloser) Path() string
Path returns the currently set path
type ParallelMultiWriter ¶
type ParallelMultiWriter struct {
// contains filtered or unexported fields
}
A writer which dispatches to multiple destinations, collecting errors on the way and returning the first one it encounteres. If a writer fails, it will not be written anymore until it is closed or reset using SetWriter
func NewParallelMultiWriter ¶
func NewParallelMultiWriter(writers []io.Writer) *ParallelMultiWriter
func (*ParallelMultiWriter) AppendWriter ¶
func (p *ParallelMultiWriter) AppendWriter(w io.Writer) int
Append the given writer, expanding our internal structures as needed As this is not thread-safe, you must assure we are not writing while locking return the index at which the new writer was appended
func (*ParallelMultiWriter) AutoInsert ¶
func (p *ParallelMultiWriter) AutoInsert(w io.Writer) int
Insert the given writer at the first free slot and return it's index. Will append if necesary
func (*ParallelMultiWriter) Capacity ¶
func (p *ParallelMultiWriter) Capacity() int
Return amount of writers we can store at max, useful for iterating with WriterAtIndex()
func (*ParallelMultiWriter) SetWriterAtIndex ¶
func (p *ParallelMultiWriter) SetWriterAtIndex(i int, w io.Writer)
Set the given writer to be located at the given index. We don't do bounds checking
func (*ParallelMultiWriter) Write ¶
func (p *ParallelMultiWriter) Write(b []byte) (n int, err error)
Writes will always succeed, even if individual writers may have failed. It's up to our user to check for errors when the write is finished
func (*ParallelMultiWriter) WriterAtIndex ¶
func (p *ParallelMultiWriter) WriterAtIndex(i int) (io.Writer, error)
Return the writer at the given index, and the first error it might have caused when writing to it. We perform no bounds checking
type ReadChannelController ¶
type ReadChannelController struct {
// contains filtered or unexported fields
}
func NewReadChannelController ¶
func NewReadChannelController(nprocs int, stats *Stats, done <-chan bool) ReadChannelController
Create a new parallel reader with nprocs go-routines and return a channel to it. Feed the channel with ChannelReader structures and listen on it's channel to read bytes until EOF, which is when the channel will be closed by the reader done will allow long reads to be interrupted by closing the channel
func (*ReadChannelController) NewChannelReaderFromPath ¶
func (r *ReadChannelController) NewChannelReaderFromPath(path string, mode os.FileMode, buf []byte) *ChannelReader
Return a new channel reader You should set either path The buffer must not be shared among multiple channel readers !
func (*ReadChannelController) NewChannelReaderFromReader ¶
func (r *ReadChannelController) NewChannelReaderFromReader(reader io.Reader, buf []byte) *ChannelReader
func (*ReadChannelController) Streams ¶
func (r *ReadChannelController) Streams() int
Return amount of streams we handle in parallel
type RootedReadController ¶
type RootedReadController struct { // The trees the controller should write to Trees []string // A possibly shared controller which may write to the given tree Ctrl ReadChannelController }
A utility structure to associate trees with a reader. NOTE: Very similar to RootedReadController !
type RootedReadControllers ¶
type RootedReadControllers []RootedReadController
func NewDeviceReadControllers ¶
func NewDeviceReadControllers(nprocs int, trees []string, stats *Stats, done <-chan bool) RootedReadControllers
A new list of Controllers, one per device it handles, which is associated with the tree's it can handle
func (RootedReadControllers) Streams ¶
func (rctrls RootedReadControllers) Streams() int
NOTE: Can this be a custom type, with just a function ? I think so ! Return the number of streams being handled in parallel
type RootedWriteController ¶
type RootedWriteController struct { // The trees the controller should write to Trees []string // A possibly shared controller which may write to the given tree Ctrl WriteChannelController }
A utility structure to associate a tree with a writer. That way, writers can be more easily associated with a device which hosts a particular Tree
type RootedWriteControllers ¶
type RootedWriteControllers []RootedWriteController
func (RootedWriteControllers) Trees ¶
func (wm RootedWriteControllers) Trees() (n int)
Returns the amount of Trees/Destinations we can write to in total
type Stats ¶
type Stats struct { // PERFORMANCE METRICS TotalFilesRead uint32 // Amount of whole files we read so far TotalFilesWritten uint32 // Amount of whole files we wrote so far FilesBeingRead uint32 // Amount of files currently being read FilesBeingWritten uint32 // Amount of files currently being written BytesRead uint64 // Total of bytes read so far, counting all input streams BytesWritten uint64 // Total of bytes written so far, counting all output streams // GENERAL INFORMATION StartedAt time.Time // The time at which we started processing }
A shared structure that is modified using atomic operations to keep track of what data the IO system is Currently processing
func (*Stats) BytesDelta ¶
func (*Stats) CopyTo ¶
CopyTo will atomically copy our fields to the destination structure. It will just read the fields atomically, and write it using standard means
func (*Stats) DeltaDataString ¶
Prints performance metrics as a single line with the given data type, including deltas of relevant metrics as compared to the last state d. You will also give the temporal distance which separates this stat from the previous one If you pass s as d, this indicates a result mode, which assumes you want the overall average throughput Sep is the separator to use between fields dataType is a combination of ElapsedData|ReadData|WriteData The respective field can be unset in case there is no data for it
type WriteChannelController ¶
type WriteChannelController struct {
// contains filtered or unexported fields
}
A utility to help control how parallel we try to write
func NewWriteChannelController ¶
func NewWriteChannelController(nprocs, channelCap int, stats *Stats) WriteChannelController
Create a new controller which deals with writing all incoming requests with nprocs go-routines. Use the channel capacity to assure less blocking will occur. A good value is depending heavily on your algorithm's patterns. Should at least be nprocs, or larger.
func (*WriteChannelController) InitChannelWriters ¶
func (w *WriteChannelController) InitChannelWriters(out []ChannelWriter)
Initialize as many new ChannelWriters as fit into the given slice of writers You will have to set it's writer before using it
func (*WriteChannelController) Streams ¶
func (w *WriteChannelController) Streams() int
Return amount of streams we handle in parallel
type WriteCloser ¶
type WriteCloser interface { io.WriteCloser // Writer returns the writer this interface instance contains Writer() io.Writer }
Like WriteCloser interface, but allows to retrieve more information specific to our usage