Documentation ¶
Overview ¶
Package recordio implements a straightforward self-synchronizing log using consistent-overhead word stuffing (yes, COWS) as described in Paul Khuong's https://www.pvk.ca/Blog/2021/01/11/stuff-your-logs/.
Stuffed Logs ¶
This package contains a very simple Decoder and Encoder that can be used to write delimited records into a log. It has no opinions about the content of those records.
Thus, you can write arbitrary bytes to an Encoder and it will delimit them appropriately, and in a way that---by construction---guarantees that the delimiter will not appear anywhere in the record. This makes it a self-synchronizing file format: you can always find the next record by searching for the delimiter.
Using the Decoder/Encoder interface, of course, does not require understanding what it is doing underneath. If you want to write to a file, you can simply open it for appending and wrap it in a Encoder:
f, err := os.OpenFile(mypath, os.RDWR|os.CREATE|os.APPEND, 0755) if err != nil { log.Fatalf("Error opening: %v", err) } e := NewEncoder(f) defer e.Close() msgs := []string{ "msg 1", "msg 2", "msg 3", } for _, msg := range msgs { if _, err := e.Encode(msg); err != nil { log.Fatalf("Error appending: %v", err) } }
Reading from an existing log is similarly simple:
f, err := os.Open(mypath) if err != nil { log.Fatalf("Error opening: %v", err) } d := NewDecoder(f) defer d.Close() for !d.Done() { b, err := d.Next() if err != nil { log.Fatalf("Error reading: %v", err) } fmt.Println(string(b)) }
Sharded Parallel Reads ¶
The interfaces here are explicitly designed to allow many of the use cases outlined in the article above, including direct support of parallel sharded reads.
To manage sharded reads, you might structure code something like this.
func processShard(r io.Reader, nominalLength int) error { d := NewDecoder(r) // If we're in the middle of a record, skip to the next full one. if err := d.SkipPartial(); err != nil { return fmt.Errorf("process shard skip: %w", err) } // Read until finished or until we exceed the shard length. // The final record inside the shard is likely going to extend // past the length a little, which is fine. for !d.Done() && d.Consumed() < nominalLength { b, err := d.Next() if err != nil { return fmt.Errorf("process shard next: %w", err) } // PROCESS b HERE } } func main() { const ( path = "/path/to/log" shards = 2 ) stat, err := os.Stat(path) if err != nil { log.Fatalf("Can't stat %q: %v", path, err) } shardSize := stat.Size() / shards g, ctx := errgroup.WithContext(context.Background()) for i := 0; i < shards; i++ { i := i // local for use in closures. g.Go(func() error { f, err := os.Open(path) if err != nil { return fmt.Errorf("open: %w", err) } seekPos := i*shardLength if _, err := f.Seek(seekPos, os.SEEK_SET); err != nil { return fmt.Errorf("seek: %w", err) } size := shardLength if i == shards-1 { size := stat.Size - seekPos } processShard(f, size) }) } if err := g.Wait(); err != nil { log.Fatal(err) } }
The key idea in the above code is that the Consumed method returns how many actual underlying bytes have contributed to record output thus far. When more than the shard length has been consumed, that shard is finished. Simply opening the file multiple times for reading and seeking provides the appropriate io.Reader interface for each shard.
MultiDecoder ¶
If you wish to implement, say, a write-ahead log over multiple ordered readers (effectively concatenating them), there is a MultiDecoder implementation contained here. There is also a handy file iterator that can be used to provide on-demand file opening for the MultiDecoder.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // CorruptRecord errors are returned (wrapped, use errors.Is to detect) // when a situation is encountered that can't happen in a clean record. // It is usually safe to skip after receiving this error, provided that a // missing entry doesn't cause consistency issues for the reader. CorruptRecord = fmt.Errorf("corrupt record") )
var (
Reserved = []byte{0xfe, 0xfd}
)
Functions ¶
This section is empty.
Types ¶
type Decoder ¶
type Decoder struct {
// contains filtered or unexported fields
}
Decoder wraps an io.Reader and allows full records to be pulled at once.
Example ¶
dec := NewDecoder(bytes.NewBuffer([]byte("\xfe\xfd\x14A very short message\x00\x00\xfe\xfd\x05hello"))) for !dec.Done() { b, err := dec.Next() if err != nil { log.Fatalf("Error reading: %v", err) } fmt.Printf("%q\n", string(b)) }
Output: "A very short message\xfe\xfd" "hello"
func NewDecoder ¶
NewDecoder creates an Decoder from the given src, which is assumed to be a word-stuffed log.
func (*Decoder) Consumed ¶
Consumed returns the number of bytes consumed from the underlying stream (not read, used).
func (*Decoder) Done ¶
Done indicates whether the underlying stream is exhausted and all records are returned.
func (*Decoder) Next ¶
Next returns the next record in the underying stream, or an error. It begins by consuming the stream until it finds a delimiter (requiring each record to start with one), so even if there was an error in a previous record, this can skip bytes until it finds a new one. It does not require the first record to begin with a delimiter. Returns a wrapped io.EOF when complete. More idiomatically, check Done after every iteration.
func (*Decoder) SkipPartial ¶
SkipPartial moves forward through the log until it finds a delimiter, if it isn't already on one. Can be used, for example, to get shards started on a record boundary without first getting a corruption error.
Example ¶
dec := NewDecoder(bytes.NewBuffer([]byte("middle-of-record\xfe\xfd\x02AB"))) if err := dec.SkipPartial(); err != nil { log.Fatalf("Error skipping partial content: %v", err) } for !dec.Done() { b, err := dec.Next() if err != nil { log.Fatalf("Error reading after partial skip: %v", err) } fmt.Printf("%q\n", string(b)) }
Output: "AB"
type DecoderIterator ¶
DecoderIterator is an iterator over Decoders, for use with the MultiDecoder.
type Encoder ¶
type Encoder struct {
// contains filtered or unexported fields
}
Encoder wraps an underlying writer and appends records to the stream when requested, encoding them using constant-overhead word stuffing.
Example ¶
buf := new(bytes.Buffer) enc := NewEncoder(buf) if _, err := enc.Encode([]byte("A very short message")); err != nil { log.Fatalf("Error appending: %v", err) } fmt.Printf("%q\n", string(buf.Bytes()))
Output: "\xfe\xfd\x14A very short message"
func NewEncoder ¶
NewEncoder creates a new Encoder with the underlying output writer.
type FilesDecoderIterator ¶
type FilesDecoderIterator struct {
// contains filtered or unexported fields
}
FilesDecoderIterator is an iterator over readers based on a list of file names.
func NewFilesDecoderIterator ¶
func NewFilesDecoderIterator(fsys fs.FS, names []string) *FilesDecoderIterator
NewFilesDecoderIterator creates a new iterator from a list of file names.
func (*FilesDecoderIterator) Close ¶
func (d *FilesDecoderIterator) Close() error
Close closes the last file, if there is one open, and makes this return io.EOF ever after.
func (*FilesDecoderIterator) Done ¶
func (d *FilesDecoderIterator) Done() bool
Done returns true iff there are no more readers to produce.
func (*FilesDecoderIterator) Next ¶
func (d *FilesDecoderIterator) Next() (*Decoder, error)
Next returns a new reader if possible, or io.EOF.
type MultiDecoder ¶
type MultiDecoder struct {
// contains filtered or unexported fields
}
MultiDecoder wraps multiple readers, either specified directly or through a reader iterator interface that can produce them on demanad. This is useful when concatenating multiple file shards together as a single record store.
Example ¶
package main import ( "bytes" "fmt" "log" "testing/fstest" "entrogo.com/stuffedio/recordio" ) const prefix = "journal" func fakeJournalData(start, end uint64) (string, []byte) { buf := new(bytes.Buffer) enc := recordio.NewEncoder(buf) defer enc.Close() for i := start; i < end; i++ { if _, err := enc.Encode([]byte(fmt.Sprintf("Record with number %d", i))); err != nil { log.Fatalf("Error appending: %v", err) } } return fmt.Sprintf("%s-%016x", prefix, start), buf.Bytes() } func main() { // Create a fake file system with some data in it. fakeFS := make(fstest.MapFS) var names []string ends := []uint64{3, 5, 7} start := uint64(1) for _, end := range ends { name, val := fakeJournalData(start, end) names = append(names, name) fakeFS[name] = &fstest.MapFile{Data: val} start = end } // Create a MultiDecoder WAL that knows about these files, using a FilesDecoderIterator. dec := recordio.NewMultiDecoderIter(recordio.NewFilesDecoderIterator(fakeFS, names)) defer dec.Close() // Read entries in order. for !dec.Done() { val, err := dec.Next() if err != nil { log.Fatalf("Error reading next value: %v", err) } fmt.Printf("%q\n", val) } }
Output: "Record with number 1" "Record with number 2" "Record with number 3" "Record with number 4" "Record with number 5" "Record with number 6"
func NewMultiDecoder ¶
func NewMultiDecoder(readers []*Decoder) *MultiDecoder
NewMultiDecoder creates a MultiDecoder from a slice of readers. These are ordered, and will be consumed in the order given.
func NewMultiDecoderIter ¶
func NewMultiDecoderIter(ri DecoderIterator) *MultiDecoder
NewMultiDecoderIter creates a MultiDecoder where each reader is requested, one at a time, through the given reader-returning function. The function is expected to return a nil Decoder if there are no more readers.
func (*MultiDecoder) Close ¶
func (d *MultiDecoder) Close() error
Close closes the currently busy underlying reader and reader iterator, if any.
func (*MultiDecoder) Consumed ¶
func (d *MultiDecoder) Consumed() int64
Consumed returns the total number of bytes consumed thus far.
func (*MultiDecoder) Done ¶
func (d *MultiDecoder) Done() bool
Done returns whether this multi reader has exhausted all underlying readers.
func (*MultiDecoder) Next ¶
func (d *MultiDecoder) Next() ([]byte, error)
Next gets the next record for these readers.
type ReverseDecoder ¶
type ReverseDecoder struct {
// contains filtered or unexported fields
}
ReverseDecoder can be used on an io.ReaderAt to read stuffed records in reverse. It does so by searching backwards for delimiters, and then reading them forward from there. Reproduces errors in the same way that a forward reader would.
Example ¶
msgs := []string{ "Message 1", "Message 2", "Message 3", "Message 4", } buf := new(bytes.Buffer) s := NewEncoder(buf) for _, msg := range msgs { if _, err := s.Encode([]byte(msg)); err != nil { log.Fatalf("Error appending: %v", err) } } u := NewReverseDecoder(bytes.NewReader(buf.Bytes()), int64(buf.Len())) for !u.Done() { b, err := u.Next() if err != nil { log.Fatalf("Error reading in reverse: %v", err) } fmt.Printf("%q\n", b) } // Output // "Message 4" // "Message 3" // "Message 2" // "Message 1"
Output:
func NewReverseDecoder ¶
func NewReverseDecoder(r io.ReaderAt, size int64) *ReverseDecoder
NewReverseDecoder creates a new unstuffer that works in reverse. Because ReaderAt doesn't supply a size, and there are no good standard interfaces to depend on for this, it is required to indicate how long the underlying data is for the io.ReaderAt. This allows the reverse reader to start at the end.
func (*ReverseDecoder) Close ¶
func (d *ReverseDecoder) Close() error
Close closes the underlying reader if it is also an io.Closer.
func (*ReverseDecoder) Done ¶
func (d *ReverseDecoder) Done() bool
Done indicates whether this reverse unstuffer has reached (and produced) the first record in the underlying reader.
func (*ReverseDecoder) Next ¶
func (d *ReverseDecoder) Next() ([]byte, error)
Next attempts to find and produce the next record in reverse in the underlying stream.