savior

package module
v0.0.0-...-6034e87 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2020 License: MIT Imports: 12 Imported by: 66

README

savior

MIT licensed Build Status codecov Go Report Card

savior is an optimistic attempt at providing an abstract layer over various compression formats (like deflate, gzip, bzip2) and archive formats (like zip, tar, etc.) all while providing reasonably good save/resume support.

Concepts

There are two main interfaces in savior: Sources and Extractors.

Sources

A savior.Source represents a data stream that can be read from start to end.

For example, a source might be:

  • An HTTP(S) resource on a server
  • A file on disk
  • A buffer in memory
  • Another source being decompressed from FLATE, gzip, or bzip2

savior ships with seeksource, which covers the former (in combination with htfs), and flatesource, gzipsource, bzip2source, which cover the latter.

A source's size doesn't need to be known in advance, although sources can optionally implement a Progress() method that returns a float64 in [0,1] — indicating how much of the stream has been consumed.

Random access is not required of sources, but saving and resuming is.

Before using a source, the Resume() method should always be called:

  • If it's a fresh source, a nil checkpoint should be passed to Resume. This indicates "start from the beginning"
  • If we're resuming mid-stream, a *SourceCheckpoint is passed. The source shall then try to resume using that checkpoint information. Whether it fails or not, the offset it returns should be valid. (TL;DR if it fails, just return a 0 offset)

*SourceCheckpoint are typically saved to non-volatile storage - the test suite ensures that they can be encoded/decoded via encoding/gob.

Decompressing sources like flatesource and bzip2source can typically only checkpoint on a block boundary. For that reason, it's legal for sources to return a nil *SourceCheckpoint from the Save() method. It just means that if you stop reading there and resume later, it'll start over from the beginning.

To account for the fact that sources may save an earlier position than you needed, the DiscardByRead function is exposed, letting you advance by a number of bytes to resume reading exactly where you needed.

Note: flatesource, gzipsource and bzip2source are all implemented on top of forks of golang's flate, gzip and bzip2 extractors, which can be found at itchio/kompress

Extractors

Extractors abstract over archive formats, like .tar and .zip, which may contain multiple entries (directories, files, symlinks).

It's not easy to find a common interface between those, since the .zip format knows about all entries and their sizes in advance, whereas the .tar format has no dictionary, entries are discovered one by one as the archive is extracted.

Extractors all have their own, specific New() method, taking whatever arguments they need to read and extract an archive.

However, they share a few common methods:

  • Resume asks an extractor to start work, either from scratch or from a checkpoint. It returns an ExtractorResult, which contains a list of *Entry - all extractors are able to return the complete contents of the archive once it is fully extracted.
  • SetSaveConsumer sets a SaveConsumer for the extractor, which it'll use whenever it's ready to save (and SaveConsumer.ShouldSave returns true). Extractor state are saved as *ExtractorCheckpoint, which are guaranteed to be encodable via encoding/gob. SaveConsumer implementations can also stop decompression by returning AfterSaveStop from Save().
  • SetConsumer sets a *state.Consumer for the extractor, which it'll use to send log messages and emit progress info (a float64 in a [0,1] range).
  • Features returns the set of features supported by an extractor, including how good its resume support is (non-existent, between entries, or mid-entries), whether it supports preallocation, etc.

Extractors can use sources internally, for example:

  • A gzipsource can be passed to tarextractor to extract a .tar.gz file. The tarextractor will checkpoint any underlying source, so it doesn't need to know that the whole tar is in fact read from a gzip stream.
  • The zipextractor will use a flatesource for entries compressed with the Deflate method - this allows it to checkpoint mid-entry.

Note: tarextractor and zipextractor are implemented on top of forks of golang's zip and tar archive handlers, which can be found at itchio/arkive.

Note: extractors are not responsible for closing sinks - the sinks are created and closed by the caller itself.

Sinks

A Sink is typically what an extractor extracts "to". In the simplest case, it's a FolderSink, which writes directly to the filesystem. However, other implementations exist, such as checker.Sink, used in test to extract in-memory and validate the decompressed data against a reference set.

FolderSink is opinionated — in particular, it:

  • Writes symlinks as text files on Windows
    • Many versions of Windows support junctions, but they have different semantics, so they're not used
    • Many versions of Windows support actual symlinks, but they require Administrator privileges to create, so they're not used
    • Recent builds of Windows 10 support creating symlinks without Administrator privileges, but that's hardly the common denominator, so they're not used
    • Writing symlinks as text files with the os.SymlinkMode permission matches the way they're stored in .zip files, or various *nix filesystems
  • Always creates necessary parent folders (with 0755)
    • If GetWriter() is called for a file entry with CanonicalPath a/b/c, the a/ and a/b/ folders will be created
  • Does whatever it take to make sure the filesystem entry is of the right type
    • If GetWriter() is called for a file entry with CanonicalPath plugin, but plugin is currently a folder or symlink on disk, it will be removed first and re-created as a file
  • Adjusts permissions so that they're at least 0644 (or more permissive). This avoids creating files which we don't have permission to erase or overwrite later.
  • Truncates file to entry.UncompressedSize when Preallocate() is called, but not when GetWriter() is called, so that archive formats which have a zero UncompressedSize still work when resuming mid-entry.
License

savior is released under the MIT license, see the LICENSE file in this repository.

Documentation

Index

Constants

View Source
const (
	// ModeMask is or'd with files walked by butler
	ModeMask = 0666

	// LuckyMode is used when wiping in last-chance mode
	LuckyMode = 0777

	// DirMode is the default mode for directories created by butler
	DirMode = 0755
)
View Source
const (
	// EntryKindDir is the kind for a directory
	EntryKindDir = 0
	// EntryKindSymlink is the kind for a symlink
	EntryKindSymlink = 1
	// EntryKindFile is the kind for a file
	EntryKindFile = 2
)

Variables

View Source
var EnableLegacyPreallocate = os.Getenv("SAVIOR_LEGACY_PREALLOCATE") == "1"
View Source
var ErrStop = errors.New("copy was stopped after save!")

ErrStop is returned when decompression has been stopped by a SaveConsumer returning AfterActionStop.

View Source
var ErrUninitializedSource = errors.New("tried to read from source before Resume() was called")

Functions

func Debugf

func Debugf(format string, args ...interface{})

Debugf prints a message if the environment variable SAVIOR_DEBUG is set to "1"

func DiscardByRead

func DiscardByRead(source Source, delta int64) error

DiscardByRead advances a source by `delta` bytes by reading data then throwing it away. This is useful in case a source made a checkpoint shortly before the offset we actually need to resume from.

func NopConsumer

func NopConsumer() *state.Consumer

Returns a *state.Consumer that prints nothing at all.

Types

type AfterSaveAction

type AfterSaveAction int
const (
	// Continue decompressing after the checkpoint has been emitted
	AfterSaveContinue AfterSaveAction = 1
	// Stop decompression after the checkpoint has been emitted (returns ErrStop)
	AfterSaveStop AfterSaveAction = 2
)

type CallbackSourceSaveConsumer

type CallbackSourceSaveConsumer struct {
	OnSave func(checkpoint *SourceCheckpoint) error
}

func (*CallbackSourceSaveConsumer) Save

func (cssc *CallbackSourceSaveConsumer) Save(checkpoint *SourceCheckpoint) error

type Copier

type Copier struct {
	// params
	SaveConsumer SaveConsumer
	// contains filtered or unexported fields
}

func NewCopier

func NewCopier(SaveConsumer SaveConsumer) *Copier

func (*Copier) Do

func (c *Copier) Do(params *CopyParams) error

func (*Copier) Stop

func (c *Copier) Stop()

type CopyParams

type CopyParams struct {
	Src   io.Reader
	Dst   io.Writer
	Entry *Entry

	Savable Savable

	EmitProgress EmitProgressFunc
}

type EmitProgressFunc

type EmitProgressFunc func()

type Entry

type Entry struct {
	// CanonicalPath is a slash-separated path relative to the
	// root of the archive
	CanonicalPath string

	// Kind describes whether it's a regular file, a directory, or a symlink
	Kind EntryKind

	// Mode contains read/write/execute permissions, we're mostly interested in execute
	Mode os.FileMode

	// CompressedSize may be 0, if the extractor doesn't have the information
	CompressedSize int64

	// UncompressedSize may be 0, if the extractor doesn't have the information
	UncompressedSize int64

	// WriteOffset is useful if this entry struct is included in an extractor
	// checkpoint
	WriteOffset int64

	// Linkname describes the target of a symlink if the entry is a symlink
	// and the format we're extracting has symlinks in metadata rather than its contents
	Linkname string
}

An Entry is a struct that should have *just the right fields* to be useful in an extractor checkpoint. They represent a file, directory, or symlink

func (*Entry) String

func (entry *Entry) String() string

type EntryKind

type EntryKind int

func (EntryKind) String

func (ek EntryKind) String() string

type EntryWriter

type EntryWriter interface {
	io.WriteCloser

	// Sync should commit (to disk or otherwise) all the data written so far
	// to the entry.
	Sync() error
}

An EntryWriter is an io.WriteCloser that you can Sync(). This is important as saving a checkpoint (while in the middle of decompressing an archive) is only useful if we *know* that all the data we say we've decompressed is actually on disk (and not just stuck in a OS buffer somewhere).

Note that the user of an EntryWriter is not responsible for closing it. It will be closed on the next `sink.GetWriter()` call, or eventually at `sink.Close()`

func NewNopEntryWriter

func NewNopEntryWriter() EntryWriter

type Extractor

type Extractor interface {
	// Set save consumer for determining checkpoint frequency and persisting them.
	SetSaveConsumer(saveConsumer SaveConsumer)
	// Set *state.Consumer for logging
	SetConsumer(consumer *state.Consumer)
	// Perform extraction, optionally resuming from a checkpoint (if non-nil)
	// Sink is not closed, it should be closed by the caller, see simple_extract
	// for an example.
	Resume(checkpoint *ExtractorCheckpoint, sink Sink) (*ExtractorResult, error)
	// Returns the supported features for this extractor
	Features() ExtractorFeatures
}

An extractor is able to decompress entries of an archive format (like .zip, .tar, .7z, etc.), preferably in a resumable fashion.

type ExtractorCheckpoint

type ExtractorCheckpoint struct {
	SourceCheckpoint *SourceCheckpoint
	EntryIndex       int64
	Entry            *Entry
	Progress         float64
	Data             interface{}
}

type ExtractorFeatures

type ExtractorFeatures struct {
	// Short name for the extractor, like "zip", or "tar"
	Name string
	// Level of support for resumable decompression, if any
	ResumeSupport ResumeSupport
	// Is pre-allocating files supported?
	Preallocate bool
	// Is random access supported?
	RandomAccess bool
	// Features for the underlying source
	SourceFeatures *SourceFeatures
}

func (ExtractorFeatures) String

func (ef ExtractorFeatures) String() string

type ExtractorResult

type ExtractorResult struct {
	Entries []*Entry
}

func (*ExtractorResult) Size

func (er *ExtractorResult) Size() int64

Returns the total size of all listed entries, in bytes

func (*ExtractorResult) Stats

func (er *ExtractorResult) Stats() string

Returns a human-readable summary of the files, directories and symbolic links in this result.

type FileSource

type FileSource interface {
	SeekSource

	Close() error
}

FileSource is a SeekSource that can be closed (to release associated resources)

type FolderSink

type FolderSink struct {
	Directory string
	Consumer  *state.Consumer
	// contains filtered or unexported fields
}

func (*FolderSink) Close

func (fs *FolderSink) Close() error

func (*FolderSink) GetWriter

func (fs *FolderSink) GetWriter(entry *Entry) (EntryWriter, error)

func (*FolderSink) Mkdir

func (fs *FolderSink) Mkdir(entry *Entry) error

func (*FolderSink) Nuke

func (fs *FolderSink) Nuke() error

func (*FolderSink) Preallocate

func (fs *FolderSink) Preallocate(entry *Entry) error
func (fs *FolderSink) Symlink(entry *Entry, linkname string) error

type NopSink

type NopSink struct {
	Directory string
	Consumer  *state.Consumer
	// contains filtered or unexported fields
}

NopSink does not write anything anywhere

func (*NopSink) Close

func (ns *NopSink) Close() error

func (*NopSink) GetWriter

func (ns *NopSink) GetWriter(entry *Entry) (EntryWriter, error)

func (*NopSink) Mkdir

func (ns *NopSink) Mkdir(entry *Entry) error

func (*NopSink) Nuke

func (ns *NopSink) Nuke() error

func (*NopSink) Preallocate

func (ns *NopSink) Preallocate(entry *Entry) error
func (ns *NopSink) Symlink(entry *Entry, linkname string) error

type ResumeSupport

type ResumeSupport int
const (
	// While the extractor exposes Save/Resume, in practice, resuming
	// will probably waste I/O and processing redoing a lot of work
	// that was already done, so it's not recommended to run it against
	// a networked resource
	ResumeSupportNone ResumeSupport = 0
	// The extractor can save/resume between each entry, but not in the middle of an entry
	ResumeSupportEntry ResumeSupport = 1
	// The extractor can save/resume within an entry, on a deflate/bzip2 block boundary for example
	ResumeSupportBlock ResumeSupport = 2
)

func (ResumeSupport) String

func (rs ResumeSupport) String() string

type Savable

type Savable interface {
	WantSave()
}

type SaveConsumer

type SaveConsumer interface {
	// Returns true if a checkpoint should be emitted. `copiedBytes` is the
	// amount of bytes extracted since the last time ShouldSave was called.
	ShouldSave(copiedBytes int64) bool
	// Should persist a checkpoint and return instructions on whether to continue
	// or stop decompression.
	Save(checkpoint *ExtractorCheckpoint) (AfterSaveAction, error)
}

func NopSaveConsumer

func NopSaveConsumer() SaveConsumer

Returns a SaveConsumer that never asks for a checkpoint, ignores any emitted checkpoints, and always tells the extractor to continue decompressing.

type SeekSource

type SeekSource interface {
	Source

	// Tell returns the current offset of the seeksource
	Tell() int64
	// Size returns the total number of bytes the seeksource reads
	Size() int64
	// Section returns a SeekSource that reads from start to start+size
	// Note that the returned SeekSource will use the same underlying
	// io.ReadSeeker, so the original SeekSource cannot be used anymore.
	// The returned SeekSource should be Resume()'d before being used
	Section(start int64, size int64) (SeekSource, error)
}

SeekSource is a Source with extra powers: you can know its size, tell which offset it's currently at, and ask for a view of a subsection of it.

type Sink

type Sink interface {
	// Mkdir creates a directory (and parents if needed)
	Mkdir(entry *Entry) error

	// Symlink creates a symlink
	Symlink(entry *Entry, linkname string) error

	// GetWriter returns a writer at entry.WriteOffset. Any previously
	// returned writer gets closed at this point.
	GetWriter(entry *Entry) (EntryWriter, error)

	// Preallocate space for a file based on the entry's UncompressedSize
	Preallocate(entry *Entry) error

	// Remove everything written so far
	Nuke() error

	// Close this sink, including all pending writers
	Close() error
}

A Sink is what extractors extract to. Typically, that would be a folder on a filesystem, but it could be anything else: repackaging as another archive type, uploading transparently as small blocks.

Think of it as a very thin slice of the `os` package that can be implemented completely independently of the filesystem.

type Source

type Source interface {
	// Resume tries to use a checkpoint to start reading again at the checkpoint.
	// It *must be called* before using the source, whether or not checkpoint is
	// an actual mid-stream checkpoint or just the nil checkpoint (for Offset=0)
	Resume(checkpoint *SourceCheckpoint) (int64, error)

	// Register a source save consumer for this source
	SetSourceSaveConsumer(ssc SourceSaveConsumer)

	// Let the source know that it should emit a checkpoint as soon as it can.
	WantSave()

	// Progress returns how much of the stream has been consumed, in a [0,1] range.
	// If this source does not support progress reporting (ie. the total size of
	// the stream is unknown), then Progress returns a negative value (typically -1).
	Progress() float64

	Features() SourceFeatures

	io.Reader

	// io.ByteReader is embedded in Source so it can be used by the `flate` package
	// without it wrapping it in a `bufio.Reader`
	io.ByteReader
}

A Source represents a data stream that does not provide random access, is not seekable, but for which checkpoints can be emitted, allowing the consumer to resume reading from the stream later.

Sources typically are either a limited interface for a more powerful resource (*os.File, eos.File, both of which provide seeking and random access), or a more powerful interface to resources typically exposed as simply an `io.Reader` in the golang standard library (flate streams, gzip streams, bzip2 streams)

Sources that expose a random access resource tend to be able to `Save()` at any given byte, whereas sources that are decompressors are typically only able to save on a block boundary.

type SourceCheckpoint

type SourceCheckpoint struct {
	// Offset is the position in the stream, in bytes
	// It should be non-zero, as the checkpoint for offset 0 is simply nil
	Offset int64

	// Data is a source-specific pointer to a struct, which must be
	// registered with `gob` so that it can be serialized and deserialized
	Data interface{}
}

SourceCheckpoint contains all the information needed for a source to resume from a given offset.

type SourceFeatures

type SourceFeatures struct {
	Name          string
	ResumeSupport ResumeSupport
}

type SourceSaveConsumer

type SourceSaveConsumer interface {
	// Send a checkpoint to the consumer. The consumer may
	// retain the checkpoint, so its contents must not change
	// after it is sent.
	Save(checkpoint *SourceCheckpoint) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL