io

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2014 License: LGPL-3.0 Imports: 9 Imported by: 4

Documentation

Overview

Package io provides asynchronous reader and writer implementations.

Next to hashing, the bulk of the time is spent doing input and output operations. For devices to achieve their maximum bandwidth, it's important not to stress them with too many parallel IOPs, which makes it necessary to control the amount of input/output streams. This is the key-feature of the ReadChannelController and WriteChannelController types implemented here.

Similar to io.MultiWriter, there is a ParallelMultiWriter implementation which sends bytes to multiple writers at once.

Index

Constants

View Source
const (
	SymbolDelta         = "Δ"
	StatsClientSep      = " | "
	ElapsedData    int8 = 1 << iota
	ReadData
	WriteData
)
View Source
const (
	SymbolHash      = "#"
	SymbolWallclock = "WC"
	SymbolTotal     = "T"
)

Variables

This section is empty.

Functions

func DeviceMap

func DeviceMap(paths []string) [][]string

deviceMap maps the given paths to their device ids, effectively grouping them by device. We use a simple array for this as actual device IDs are not relevant

func NewUncheckedParallelMultiWriter

func NewUncheckedParallelMultiWriter(writers ...io.Writer) io.Writer

Types

type BytesVolume

type BytesVolume uint64

func (BytesVolume) String

func (b BytesVolume) String() string

func (BytesVolume) StringPad

func (b BytesVolume) StringPad(pad string) string

Convert ourselves into a nice and human readable representation The padding string should be something compatible to the %f format, like "6.2"

type ChannelReader

type ChannelReader struct {
	// contains filtered or unexported fields
}

Contains all information about a file or reader to be read

func (*ChannelReader) WriteTo

func (p *ChannelReader) WriteTo(w io.Writer) (n int64, err error)

Allows to use a ChannelReader as source for io.Copy operations This should be preferred as it will save a copy operation WriteTo will block until a Reader is ready to serve us bytes Note that the read operation is performed by N reader routines - we just receive the data and pass it on Also we assume that write blocks until the operation is finished. If you perform non-blocking writes, you must copy the buffer !

type ChannelWriter

type ChannelWriter struct {
	// contains filtered or unexported fields
}

Used in conjunction with a WriteChannelController, serving as front-end communicating with the actual writer that resides in a separate go-routine

func (*ChannelWriter) Close

func (c *ChannelWriter) Close() error

func (*ChannelWriter) SetWriter

func (c *ChannelWriter) SetWriter(w io.Writer)

Set our writer to be the given one. Allows to reuse ChannelWriters

func (*ChannelWriter) Write

func (c *ChannelWriter) Write(b []byte) (n int, err error)

Send bytes down our channel and wait for the writer on the end to be done, retrieving the result.

func (*ChannelWriter) Writer

func (c *ChannelWriter) Writer() io.Writer

type LazyFileWriteCloser

type LazyFileWriteCloser struct {
	// contains filtered or unexported fields
}

A writer that will create a new file and intermediate directories on first write. You must call the close method to finish the writes and release system resources

func (*LazyFileWriteCloser) Close

func (l *LazyFileWriteCloser) Close() error

Close our writer if it was initialized already. Therefore it's safe to call this even if Write wasn't called beforehand

func (*LazyFileWriteCloser) Path

func (l *LazyFileWriteCloser) Path() string

Path returns the currently set path

func (*LazyFileWriteCloser) SetPath

func (l *LazyFileWriteCloser) SetPath(p string, mode os.FileMode)

SetPath changes the path to the given one. It's an error to set a new path while the previous writer wasn't closed yet

func (*LazyFileWriteCloser) Write

func (l *LazyFileWriteCloser) Write(b []byte) (n int, err error)

type ParallelMultiWriter

type ParallelMultiWriter struct {
	// contains filtered or unexported fields
}

A writer which dispatches to multiple destinations, collecting errors on the way and returning the first one it encounteres. If a writer fails, it will not be written anymore until it is closed or reset using SetWriter

func NewParallelMultiWriter

func NewParallelMultiWriter(writers []io.Writer) *ParallelMultiWriter

func (*ParallelMultiWriter) AppendWriter

func (p *ParallelMultiWriter) AppendWriter(w io.Writer) int

Append the given writer, expanding our internal structures as needed As this is not thread-safe, you must assure we are not writing while locking return the index at which the new writer was appended

func (*ParallelMultiWriter) AutoInsert

func (p *ParallelMultiWriter) AutoInsert(w io.Writer) int

Insert the given writer at the first free slot and return it's index. Will append if necesary

func (*ParallelMultiWriter) Capacity

func (p *ParallelMultiWriter) Capacity() int

Return amount of writers we can store at max, useful for iterating with WriterAtIndex()

func (*ParallelMultiWriter) SetWriterAtIndex

func (p *ParallelMultiWriter) SetWriterAtIndex(i int, w io.Writer)

Set the given writer to be located at the given index. We don't do bounds checking

func (*ParallelMultiWriter) Write

func (p *ParallelMultiWriter) Write(b []byte) (n int, err error)

Writes will always succeed, even if individual writers may have failed. It's up to our user to check for errors when the write is finished

func (*ParallelMultiWriter) WriterAtIndex

func (p *ParallelMultiWriter) WriterAtIndex(i int) (io.Writer, error)

Return the writer at the given index, and the first error it might have caused when writing to it. We perform no bounds checking

type ReadChannelController

type ReadChannelController struct {
	// contains filtered or unexported fields
}

func NewReadChannelController

func NewReadChannelController(nprocs int, stats *Stats, done <-chan bool) ReadChannelController

Create a new parallel reader with nprocs go-routines and return a channel to it. Feed the channel with ChannelReader structures and listen on it's channel to read bytes until EOF, which is when the channel will be closed by the reader done will allow long reads to be interrupted by closing the channel

func (*ReadChannelController) NewChannelReaderFromPath

func (r *ReadChannelController) NewChannelReaderFromPath(path string, mode os.FileMode, buf []byte) *ChannelReader

Return a new channel reader You should set either path The buffer must not be shared among multiple channel readers !

func (*ReadChannelController) NewChannelReaderFromReader

func (r *ReadChannelController) NewChannelReaderFromReader(reader io.Reader, buf []byte) *ChannelReader

func (*ReadChannelController) Streams

func (r *ReadChannelController) Streams() int

Return amount of streams we handle in parallel

type RootedReadController

type RootedReadController struct {
	// The trees the controller should write to
	Trees []string

	// A possibly shared controller which may write to the given tree
	Ctrl ReadChannelController
}

A utility structure to associate trees with a reader. NOTE: Very similar to RootedReadController !

type RootedReadControllers

type RootedReadControllers []RootedReadController

func NewDeviceReadControllers

func NewDeviceReadControllers(nprocs int, trees []string, stats *Stats, done <-chan bool) RootedReadControllers

A new list of Controllers, one per device it handles, which is associated with the tree's it can handle

func (RootedReadControllers) Streams

func (rctrls RootedReadControllers) Streams() int

NOTE: Can this be a custom type, with just a function ? I think so ! Return the number of streams being handled in parallel

type RootedWriteController

type RootedWriteController struct {
	// The trees the controller should write to
	Trees []string

	// A possibly shared controller which may write to the given tree
	Ctrl WriteChannelController
}

A utility structure to associate a tree with a writer. That way, writers can be more easily associated with a device which hosts a particular Tree

type RootedWriteControllers

type RootedWriteControllers []RootedWriteController

func (RootedWriteControllers) Trees

func (wm RootedWriteControllers) Trees() (n int)

Returns the amount of Trees/Destinations we can write to in total

type Stats

type Stats struct {
	// PERFORMANCE METRICS
	TotalFilesRead    uint32 // Amount of whole files we read so far
	TotalFilesWritten uint32 // Amount of whole files we wrote so far
	FilesBeingRead    uint32 // Amount of files currently being read
	FilesBeingWritten uint32 // Amount of files currently being written
	BytesRead         uint64 // Total of bytes read so far, counting all input streams
	BytesWritten      uint64 // Total of bytes written so far, counting all output streams

	// GENERAL INFORMATION
	StartedAt time.Time // The time at which we started processing

}

A shared structure that is modified using atomic operations to keep track of what data the IO system is Currently processing

func (*Stats) BytesDelta

func (s *Stats) BytesDelta(cur, prev uint64, td time.Duration, resultMode bool) string

func (*Stats) CopyTo

func (s *Stats) CopyTo(d *Stats)

CopyTo will atomically copy our fields to the destination structure. It will just read the fields atomically, and write it using standard means

func (*Stats) DeltaDataString

func (s *Stats) DeltaDataString(dataType int8, d *Stats, td time.Duration, sep string) (out string)

Prints performance metrics as a single line with the given data type, including deltas of relevant metrics as compared to the last state d. You will also give the temporal distance which separates this stat from the previous one If you pass s as d, this indicates a result mode, which assumes you want the overall average throughput Sep is the separator to use between fields dataType is a combination of ElapsedData|ReadData|WriteData The respective field can be unset in case there is no data for it

func (*Stats) Elapsed

func (s *Stats) Elapsed() time.Duration

Return amount of time elapsed since we started the operation

func (*Stats) InOut

func (s *Stats) InOut(cur uint32) string

func (*Stats) IntDelta

func (s *Stats) IntDelta(cur, prev uint32, td time.Duration, resultMode bool) string

func (*Stats) MostFiles

func (s *Stats) MostFiles() uint32

MostFiles returns the greatest number of files, either the one that were read, or the ones that were written

type WriteChannelController

type WriteChannelController struct {
	// contains filtered or unexported fields
}

A utility to help control how parallel we try to write

func NewWriteChannelController

func NewWriteChannelController(nprocs, channelCap int, stats *Stats) WriteChannelController

Create a new controller which deals with writing all incoming requests with nprocs go-routines. Use the channel capacity to assure less blocking will occur. A good value is depending heavily on your algorithm's patterns. Should at least be nprocs, or larger.

func (*WriteChannelController) InitChannelWriters

func (w *WriteChannelController) InitChannelWriters(out []ChannelWriter)

Initialize as many new ChannelWriters as fit into the given slice of writers You will have to set it's writer before using it

func (*WriteChannelController) Streams

func (w *WriteChannelController) Streams() int

Return amount of streams we handle in parallel

type WriteCloser

type WriteCloser interface {
	io.WriteCloser

	// Writer returns the writer this interface instance contains
	Writer() io.Writer
}

Like WriteCloser interface, but allows to retrieve more information specific to our usage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL