recordio

package
v0.7.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2021 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package recordio implements a straightforward self-synchronizing log using consistent-overhead word stuffing (yes, COWS) as described in Paul Khuong's https://www.pvk.ca/Blog/2021/01/11/stuff-your-logs/.

Stuffed Logs

This package contains a very simple Decoder and Encoder that can be used to write delimited records into a log. It has no opinions about the content of those records.

Thus, you can write arbitrary bytes to an Encoder and it will delimit them appropriately, and in a way that---by construction---guarantees that the delimiter will not appear anywhere in the record. This makes it a self-synchronizing file format: you can always find the next record by searching for the delimiter.

Using the Decoder/Encoder interface, of course, does not require understanding what it is doing underneath. If you want to write to a file, you can simply open it for appending and wrap it in a Encoder:

f, err := os.OpenFile(mypath, os.RDWR|os.CREATE|os.APPEND, 0755)
if err != nil {
  log.Fatalf("Error opening: %v", err)
}
e := NewEncoder(f)
defer e.Close()

msgs := []string{
  "msg 1",
  "msg 2",
  "msg 3",
}

for _, msg := range msgs {
  if _, err := e.Encode(msg); err != nil {
    log.Fatalf("Error appending: %v", err)
  }
}

Reading from an existing log is similarly simple:

f, err := os.Open(mypath)
if err != nil {
  log.Fatalf("Error opening: %v", err)
}
d := NewDecoder(f)
defer d.Close()

for !d.Done() {
  b, err := d.Next()
  if err != nil {
    log.Fatalf("Error reading: %v", err)
  }
  fmt.Println(string(b))
}

Sharded Parallel Reads

The interfaces here are explicitly designed to allow many of the use cases outlined in the article above, including direct support of parallel sharded reads.

To manage sharded reads, you might structure code something like this.

func processShard(r io.Reader, nominalLength int) error {
  d := NewDecoder(r)
  // If we're in the middle of a record, skip to the next full one.
  if err := d.SkipPartial(); err != nil {
    return fmt.Errorf("process shard skip: %w", err)
  }
  // Read until finished or until we exceed the shard length.
  // The final record inside the shard is likely going to extend
  // past the length a little, which is fine.
  for !d.Done() && d.Consumed() < nominalLength {
    b, err := d.Next()
    if err != nil {
      return fmt.Errorf("process shard next: %w", err)
    }
    // PROCESS b HERE
  }
}

func main() {
  const (
    path = "/path/to/log"
    shards = 2
  )
  stat, err := os.Stat(path)
  if err != nil {
    log.Fatalf("Can't stat %q: %v", path, err)
  }
  shardSize := stat.Size() / shards

  g, ctx := errgroup.WithContext(context.Background())

  for i := 0; i < shards; i++ {
    i := i // local for use in closures.
    g.Go(func() error {
      f, err := os.Open(path)
      if err != nil {
        return fmt.Errorf("open: %w", err)
      }
      seekPos := i*shardLength
      if _, err := f.Seek(seekPos, os.SEEK_SET); err != nil {
        return fmt.Errorf("seek: %w", err)
      }
      size := shardLength
      if i == shards-1 {
        size := stat.Size - seekPos
      }
      processShard(f, size)
    })
  }

  if err := g.Wait(); err != nil {
    log.Fatal(err)
  }
}

The key idea in the above code is that the Consumed method returns how many actual underlying bytes have contributed to record output thus far. When more than the shard length has been consumed, that shard is finished. Simply opening the file multiple times for reading and seeking provides the appropriate io.Reader interface for each shard.

MultiDecoder

If you wish to implement, say, a write-ahead log over multiple ordered readers (effectively concatenating them), there is a MultiDecoder implementation contained here. There is also a handy file iterator that can be used to provide on-demand file opening for the MultiDecoder.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// CorruptRecord errors are returned (wrapped, use errors.Is to detect)
	// when a situation is encountered that can't happen in a clean record.
	// It is usually safe to skip after receiving this error, provided that a
	// missing entry doesn't cause consistency issues for the reader.
	CorruptRecord = fmt.Errorf("corrupt record")
)
View Source
var (
	Reserved = []byte{0xfe, 0xfd}
)

Functions

This section is empty.

Types

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

Decoder wraps an io.Reader and allows full records to be pulled at once.

Example
dec := NewDecoder(bytes.NewBuffer([]byte("\xfe\xfd\x14A very short message\x00\x00\xfe\xfd\x05hello")))
for !dec.Done() {
	b, err := dec.Next()
	if err != nil {
		log.Fatalf("Error reading: %v", err)
	}
	fmt.Printf("%q\n", string(b))
}
Output:

"A very short message\xfe\xfd"
"hello"

func NewDecoder

func NewDecoder(src io.Reader) *Decoder

NewDecoder creates an Decoder from the given src, which is assumed to be a word-stuffed log.

func (*Decoder) Close

func (d *Decoder) Close() error

Close closes the underlying stream, if it happens to implement io.Closer.

func (*Decoder) Consumed

func (d *Decoder) Consumed() int64

Consumed returns the number of bytes consumed from the underlying stream (not read, used).

func (*Decoder) Done

func (d *Decoder) Done() bool

Done indicates whether the underlying stream is exhausted and all records are returned.

func (*Decoder) Next

func (d *Decoder) Next() ([]byte, error)

Next returns the next record in the underying stream, or an error. It begins by consuming the stream until it finds a delimiter (requiring each record to start with one), so even if there was an error in a previous record, this can skip bytes until it finds a new one. It does not require the first record to begin with a delimiter. Returns a wrapped io.EOF when complete. More idiomatically, check Done after every iteration.

func (*Decoder) SkipPartial

func (d *Decoder) SkipPartial() error

SkipPartial moves forward through the log until it finds a delimiter, if it isn't already on one. Can be used, for example, to get shards started on a record boundary without first getting a corruption error.

Example
dec := NewDecoder(bytes.NewBuffer([]byte("middle-of-record\xfe\xfd\x02AB")))
if err := dec.SkipPartial(); err != nil {
	log.Fatalf("Error skipping partial content: %v", err)
}
for !dec.Done() {
	b, err := dec.Next()
	if err != nil {
		log.Fatalf("Error reading after partial skip: %v", err)
	}
	fmt.Printf("%q\n", string(b))
}
Output:

"AB"

type DecoderIterator

type DecoderIterator interface {
	io.Closer

	Next() (*Decoder, error)
	Done() bool
}

DecoderIterator is an iterator over Decoders, for use with the MultiDecoder.

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder wraps an underlying writer and appends records to the stream when requested, encoding them using constant-overhead word stuffing.

Example
buf := new(bytes.Buffer)

enc := NewEncoder(buf)
if _, err := enc.Encode([]byte("A very short message")); err != nil {
	log.Fatalf("Error appending: %v", err)
}

fmt.Printf("%q\n", string(buf.Bytes()))
Output:

"\xfe\xfd\x14A very short message"

func NewEncoder

func NewEncoder(dest io.Writer) *Encoder

NewEncoder creates a new Encoder with the underlying output writer.

func (*Encoder) Close

func (e *Encoder) Close() error

Close cleans up the underlying streams. If the underlying stream is also an io.Closer, it will close it.

func (*Encoder) Encode

func (e *Encoder) Encode(p []byte) (int, error)

Encode adds a record to the end of the underlying writer. It encodes it using word stuffing. It returns the actual number of bytes written, which will always be slightly more than requested if there is no error.

type FilesDecoderIterator

type FilesDecoderIterator struct {
	// contains filtered or unexported fields
}

FilesDecoderIterator is an iterator over readers based on a list of file names.

func NewFilesDecoderIterator

func NewFilesDecoderIterator(fsys fs.FS, names []string) *FilesDecoderIterator

NewFilesDecoderIterator creates a new iterator from a list of file names.

func (*FilesDecoderIterator) Close

func (d *FilesDecoderIterator) Close() error

Close closes the last file, if there is one open, and makes this return io.EOF ever after.

func (*FilesDecoderIterator) Done

func (d *FilesDecoderIterator) Done() bool

Done returns true iff there are no more readers to produce.

func (*FilesDecoderIterator) Next

func (d *FilesDecoderIterator) Next() (*Decoder, error)

Next returns a new reader if possible, or io.EOF.

type MultiDecoder

type MultiDecoder struct {
	// contains filtered or unexported fields
}

MultiDecoder wraps multiple readers, either specified directly or through a reader iterator interface that can produce them on demanad. This is useful when concatenating multiple file shards together as a single record store.

Example
package main

import (
	"bytes"
	"fmt"
	"log"
	"testing/fstest"

	"entrogo.com/stuffedio/recordio"
)

const prefix = "journal"

func fakeJournalData(start, end uint64) (string, []byte) {
	buf := new(bytes.Buffer)
	enc := recordio.NewEncoder(buf)
	defer enc.Close()

	for i := start; i < end; i++ {
		if _, err := enc.Encode([]byte(fmt.Sprintf("Record with number %d", i))); err != nil {
			log.Fatalf("Error appending: %v", err)
		}
	}

	return fmt.Sprintf("%s-%016x", prefix, start), buf.Bytes()
}

func main() {
	// Create a fake file system with some data in it.
	fakeFS := make(fstest.MapFS)

	var names []string

	ends := []uint64{3, 5, 7}
	start := uint64(1)
	for _, end := range ends {
		name, val := fakeJournalData(start, end)
		names = append(names, name)
		fakeFS[name] = &fstest.MapFile{Data: val}
		start = end
	}

	// Create a MultiDecoder WAL that knows about these files, using a FilesDecoderIterator.
	dec := recordio.NewMultiDecoderIter(recordio.NewFilesDecoderIterator(fakeFS, names))
	defer dec.Close()

	// Read entries in order.
	for !dec.Done() {
		val, err := dec.Next()
		if err != nil {
			log.Fatalf("Error reading next value: %v", err)
		}
		fmt.Printf("%q\n", val)
	}

}
Output:

"Record with number 1"
"Record with number 2"
"Record with number 3"
"Record with number 4"
"Record with number 5"
"Record with number 6"

func NewMultiDecoder

func NewMultiDecoder(readers []*Decoder) *MultiDecoder

NewMultiDecoder creates a MultiDecoder from a slice of readers. These are ordered, and will be consumed in the order given.

func NewMultiDecoderIter

func NewMultiDecoderIter(ri DecoderIterator) *MultiDecoder

NewMultiDecoderIter creates a MultiDecoder where each reader is requested, one at a time, through the given reader-returning function. The function is expected to return a nil Decoder if there are no more readers.

func (*MultiDecoder) Close

func (d *MultiDecoder) Close() error

Close closes the currently busy underlying reader and reader iterator, if any.

func (*MultiDecoder) Consumed

func (d *MultiDecoder) Consumed() int64

Consumed returns the total number of bytes consumed thus far.

func (*MultiDecoder) Done

func (d *MultiDecoder) Done() bool

Done returns whether this multi reader has exhausted all underlying readers.

func (*MultiDecoder) Next

func (d *MultiDecoder) Next() ([]byte, error)

Next gets the next record for these readers.

type ReverseDecoder

type ReverseDecoder struct {
	// contains filtered or unexported fields
}

ReverseDecoder can be used on an io.ReaderAt to read stuffed records in reverse. It does so by searching backwards for delimiters, and then reading them forward from there. Reproduces errors in the same way that a forward reader would.

Example
msgs := []string{
	"Message 1",
	"Message 2",
	"Message 3",
	"Message 4",
}

buf := new(bytes.Buffer)

s := NewEncoder(buf)
for _, msg := range msgs {
	if _, err := s.Encode([]byte(msg)); err != nil {
		log.Fatalf("Error appending: %v", err)
	}
}

u := NewReverseDecoder(bytes.NewReader(buf.Bytes()), int64(buf.Len()))
for !u.Done() {
	b, err := u.Next()
	if err != nil {
		log.Fatalf("Error reading in reverse: %v", err)
	}
	fmt.Printf("%q\n", b)
}

// Output
// "Message 4"
// "Message 3"
// "Message 2"
// "Message 1"
Output:

func NewReverseDecoder

func NewReverseDecoder(r io.ReaderAt, size int64) *ReverseDecoder

NewReverseDecoder creates a new unstuffer that works in reverse. Because ReaderAt doesn't supply a size, and there are no good standard interfaces to depend on for this, it is required to indicate how long the underlying data is for the io.ReaderAt. This allows the reverse reader to start at the end.

func (*ReverseDecoder) Close

func (d *ReverseDecoder) Close() error

Close closes the underlying reader if it is also an io.Closer.

func (*ReverseDecoder) Done

func (d *ReverseDecoder) Done() bool

Done indicates whether this reverse unstuffer has reached (and produced) the first record in the underlying reader.

func (*ReverseDecoder) Next

func (d *ReverseDecoder) Next() ([]byte, error)

Next attempts to find and produce the next record in reverse in the underlying stream.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL