blockfmt

package
v0.0.0-...-86e9f11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2024 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Overview

Package blockfmt implements routines for reading and writing compressed and aligned ion blocks to/from backing storage.

The APIs in this package are designed with object storage in mind as the primary backing store.

The CompressionWriter type can be used to write aligned ion blocks (see ion.Chunker) to backing storage, and the CompressionReader type can provide positional access to compressed blocks within the backing storage.

Index

Constants

View Source
const (
	// DefaultMaxReadsInFlight is the default maximum
	// number of outstanding read operations in Converter.Run
	DefaultMaxReadsInFlight = 400
	// DefaultMaxBytesInFlight is the default maximum
	// number of bytes in flight in Converter.Run
	DefaultMaxBytesInFlight = 80 * 1024 * 1024
)
View Source
const (
	// KeyLength is the length of
	// the key that needs to be provided
	// to Sign and DecodeIndex.
	// (The contents of the key should
	// be from a cryptographically secure
	// source of random bytes.)
	KeyLength = 32
	// SignatureLength is the length
	// of the signature appended
	// to the index objects.
	SignatureLength = KeyLength + 2
)
View Source
const IndexVersion = 1

IndexVersion is the current version number of the index format.

View Source
const Version = "blockfmt/compressed/v2"

Version is the textual version of the objects produces by this package. If we start producing backwards-incompatible objects, this version number ought to be bumped.

Variables

View Source
var (
	// ErrBadMAC is returned when a signature
	// for an object does not match the
	// computed MAC.
	ErrBadMAC = errors.New("bad index signature")
)
View Source
var ErrETagChanged = errors.New("FileTree: ETag changed")

ErrETagChanged is returned by FileTree.Append when attempting to perform an insert with a file that has had its ETag change.

View Source
var ErrIndexObsolete = errors.New("index version obsolete")

ErrIndexObsolete is returned when OpenIndex detects an index file with a version number lower than IndexVersion.

View Source
var SuffixToFormat = make(map[string]func(hints []byte) (RowFormat, error))

SuffixToFormat is a list of known filename suffixes that correspond to known constructors for RowFormat objects.

Functions

func ETag

func ETag(ofs UploadFS, up Uploader, fullpath string) (string, error)

ETag gets the ETag for the provided Uploader. If the Uploader has an ETag() method, that method is used directly; otherwise ofs.ETag is used to determine the ETag.

func IsFatal

func IsFatal(err error) bool

IsFatal returns true if the error is an error known to be fatal when returned from blockfmt.Format.Convert. (A fatal error is one that will not disappear on a retry.)

func Sign

func Sign(key *Key, idx *Index) ([]byte, error)

Sign encodes an index in a binary format and signs it with the provided HMAC key.

See DecodeIndex for authenticating and decoding a signed index blob.

func Validate

func Validate(src io.Reader, t *Trailer, diag io.Writer) int

Validate validates blockfmt-formatted object in src. Any errors encountered are written as distinct lines to diag. The caller can presume that no content written to diag means that the src had no errors.

func WriteDescriptor

func WriteDescriptor(buf *ion.Buffer, st *ion.Symtab, desc *Descriptor)

WriteDescriptor writes a single descriptor to buf given the provided symbol table

func WriteDescriptors

func WriteDescriptors(buf *ion.Buffer, st *ion.Symtab, contents []Descriptor)

Types

type Blockdesc

type Blockdesc struct {
	// Offset is the offset of the *compressed*
	// output data.
	Offset int64
	// Chunks is the number of chunks
	// (with decompressed length equal to
	// 1 << Trailer.BlockShift) within
	// this block
	Chunks int
}

Blockdesc is a descriptor that is attached to each block within a Trailer.

type BufferUploader

type BufferUploader struct {
	PartSize int
	// contains filtered or unexported fields
}

BufferUploader is a simple in-memory implementation of Uploader.

func (*BufferUploader) Bytes

func (b *BufferUploader) Bytes() []byte

Bytes returns the final upload result after Close() has been called.

func (*BufferUploader) Close

func (b *BufferUploader) Close(final []byte) error

func (*BufferUploader) MinPartSize

func (b *BufferUploader) MinPartSize() int

MinPartSize implements Uploader.PartSize

func (*BufferUploader) Parts

func (b *BufferUploader) Parts() int

func (*BufferUploader) Size

func (b *BufferUploader) Size() int64

func (*BufferUploader) Upload

func (b *BufferUploader) Upload(part int64, contents []byte) error

Upload implements Uploader.Upload

type Collector

type Collector struct {
	// Pattern is the glob pattern
	// that input objects should match.
	Pattern string
	// Start is a filename below which
	// all inputs are ignored. (Start can
	// be used to begin a Collect operation
	// where a previous one has left off
	// by using the last returned path as
	// the Start value for the next collection operation.)
	Start string
	// MaxItems, if non-zero, is the maximum number
	// of items to collect.
	MaxItems int
	// MaxSize, if non-zero, is the maximum size
	// of items to collect.
	MaxSize int64
	// Fallback is the function used to
	// determine the format of an input file.
	Fallback func(string) RowFormat
}

Collector is a set of configuration parameters for collecting a list of objects.

func (*Collector) Collect

func (c *Collector) Collect(from InputFS) ([]Input, bool, error)

Collect collects items from the provided InputFS and returns them as a list of Inputs, along with a boolean indicating whether or not the results are the complete list of files.

type CompressionWriter

type CompressionWriter struct {
	// Output is the destination to which
	// the compressed data should be uploaded.
	Output Uploader
	// Comp is the compression algorithm to use.
	Comp Compressor
	// InputAlign is the expected input alignment
	// of data blocks. CompressionWriter will disallow
	// calls to Write that do not have length
	// equal to InputAlign.
	InputAlign int
	// TargetSize is the target size of flushes
	// to Output. If TargetSize is zero, then
	// the output will be flushed around
	// Output.MinPartSize
	TargetSize int
	// Trailer is the trailer being
	// built by the compression writer
	Trailer
	// MinChunksPerBlock sets the minimum
	// number of chunks per output block.
	// Below this threshold, chunks are merged
	// into adjacent blocks.
	// See also MultiWriter.MinChunksPerBlock
	MinChunksPerBlock int
	// contains filtered or unexported fields
}

CompressionWriter is a single-stream io.Writer that accepts blocks from an ion.Chunker and concatenates and compresses them into an output format that allows for seeking through the decompressed blocks without actually performing any decompression in advance.

func (*CompressionWriter) Close

func (w *CompressionWriter) Close() error

Close closes the compression writer and finalizes the output upload.

func (*CompressionWriter) Flush

func (w *CompressionWriter) Flush() error

func (*CompressionWriter) SetMinMax

func (f *CompressionWriter) SetMinMax(path []string, min, max ion.Datum)

SetMinMax Sets the `min` and `max` values for the next ION chunk. This method should only be called once for each path.

func (*CompressionWriter) SkipChecks

func (w *CompressionWriter) SkipChecks()

SkipChecks disable some runtime checks of the input data, which is ordinarily expected to be ion data. Do not use this except for testing.

func (*CompressionWriter) Write

func (w *CompressionWriter) Write(p []byte) (n int, err error)

Write implements io.Writer. Each call to Write must be of w.InputAlign bytes.

func (*CompressionWriter) WrittenBlocks

func (w *CompressionWriter) WrittenBlocks() int

WrittenBlocks returns the number of blocks written to the CompressionWriter (i.e. the number of calls to w.Write)

type Compressor

type Compressor interface {
	Name() string
	Compress(src, dst []byte) ([]byte, error)
	io.Closer
}

func CompressorByName

func CompressorByName(algo string) Compressor

CompressorByName produces the Compressor associated with the provided algorithm name, or nil if no such algorithm is known to the library.

Valid values include:

"zstd"
"zion"
"zion+zstd" (equivalent to "zion")
"zion+iguana_v0"

type Converter

type Converter struct {
	// Prepend, if R is not nil,
	// is a blockfmt-formatted stream
	// of data to prepend to the output stream.
	Prepend struct {
		// R should read data from Trailer
		// starting at offset Trailer.Blocks[0].Offset.
		// Converter will read bytes up to Trailer.Offset.
		R       io.ReadCloser
		Trailer *Trailer
	}
	// Constants is the list of templated constants
	// to be inserted into the ingested data.
	Constants []ion.Field

	// Inputs is the list of input
	// streams that need to be converted
	// into the output format.
	Inputs []Input
	// Output is the Uploader to which
	// data will be written. The Uploader
	// will be wrapped in a CompressionWriter
	// or MultiWriter depending on the number
	// of input streams and the parallelism setting.
	Output Uploader
	// Comp is the name of the compression
	// algorithm used for uploaded data blocks.
	Comp string
	// Align is the pre-compression alignment
	// of chunks written to the uploader.
	Align int
	// FlushMeta is the maximum interval
	// at which metadata is flushed.
	// Note that metadata may be flushed
	// below this interval if there is not
	// enough input data to make the intervals this wide.
	FlushMeta int
	// TargetSize is the target size of
	// chunks written to the Uploader.
	TargetSize int
	// Parallel is the maximum parallelism of
	// uploads. If Parallel is <= 0, then
	// GOMAXPROCS is used instead.
	Parallel int
	// MinInputBytesPerCPU is used to determine
	// the level of parallelism used for converting data.
	// If this setting is non-zero, then the converter
	// will try to ensure that there are at least this
	// many bytes of input data for each independent
	// parallel stream used for conversion.
	//
	// Picking a larger setting for MinInputBytesPerCPU
	// will generally increase the effiency of the
	// conversion (in bytes converted per CPU-second)
	// and also the compactness of the output data.
	MinInputBytesPerCPU int64
	// MaxReadsInFlight is the maximum number of
	// prefetched reads in flight. If this is less
	// than or equal to zero, then DefaultMaxReadsInFlight is used.
	MaxReadsInFlight int

	// DisablePrefetch, if true, disables
	// prefetching of inputs.
	DisablePrefetch bool
	// contains filtered or unexported fields
}

Converter performs single- or multi-stream conversion of a list of inputs in parallel.

func (*Converter) MultiStream

func (c *Converter) MultiStream() bool

MultiStream returns whether the configuration of Converter would lead to a multi-stream upload.

func (*Converter) Run

func (c *Converter) Run() error

Run runs the conversion operation and returns the first error it ecounters. Additionally, it will populate c.Inputs[*].Err with any errors associated with the inputs. Note that Run stops at the first encountered error, so if one of the Inputs has Err set, then subsequent items in Inputs may not have been processed at all.

func (*Converter) Trailer

func (c *Converter) Trailer() *Trailer

type Decoder

type Decoder struct {
	// BlockShift is the log2 of the block size.
	// BlockShift is set automatically by Decoder.Set.
	BlockShift int
	// Algo is the algorithm to use for decompressing
	// the input data blocks.
	// Algo is set automatically by Decoder.Set.
	Algo string

	// Fields is the dereference push-down hint
	// for the fields that should be decompressed
	// from the input. Note that the zero value (nil)
	// means all fields, but a zero-length slice explicitly
	// means zero fields (i.e. decode empty structures).
	Fields []string

	// Malloc should return a slice with the given size.
	// If Malloc is nil, then make([]byte, size) is used.
	// If Malloc is non-nil, then Free should be set.
	Malloc func(size int) []byte
	// If Malloc is set, then Free should be
	// set to a corresponding function to release
	// data allocated via Malloc.
	Free func([]byte)
	// contains filtered or unexported fields
}

Decoder is used to decode blocks from a Trailer and an associated data stream.

func (*Decoder) Copy

func (d *Decoder) Copy(dst io.Writer, src io.Reader) (int64, error)

Copy incrementally decompresses data from src and writes it to dst. It returns the number of bytes written to dst and the first error encountered, if any.

Copy always calls dst.Write with memory allocated via d.Malloc, so dst may be an io.Writer returned via a vm.QuerySink provided that d.Malloc is set to vm.Malloc.

func (*Decoder) CopyBytes

func (d *Decoder) CopyBytes(dst io.Writer, src []byte) (int64, error)

CopyBytes incrementally decompresses data from src and writes it to dst. It returns the number of bytes written to dst and the first error encountered, if any. If dst implements ZionWriter and d.Algo is "zion" then compressed data may be passed directly to dst (see ZionWriter for more details).

func (*Decoder) Decompress

func (d *Decoder) Decompress(src io.Reader, dst []byte) (int, error)

Decompress decodes d.Trailer and puts its contents into dst. len(dst) must be equal to d.Trailer.Decompressed(). src must read the data that is referenced by the d.Trailer.

func (*Decoder) Set

func (d *Decoder) Set(t *Trailer)

Set copies the [Algo] and [BlockShift] fields from [t] into [d].

type Descriptor

type Descriptor struct {
	// ObjectInfo describes *this*
	// object's full path, ETag, and format.
	ObjectInfo
	// Trailer is the trailer that is part
	// of the object.
	Trailer Trailer
}

Descriptor describes a single object within an Index.

func ReadDescriptor

func ReadDescriptor(d ion.Datum) (*Descriptor, error)

ReadDescriptor reads a descriptor from an ion datum.

func ReadDescriptors

func ReadDescriptors(d ion.Datum) ([]Descriptor, error)

ReadDescriptors reads a list of descriptors from an ion datum.

func (*Descriptor) Decode

func (d *Descriptor) Decode(td *TrailerDecoder, v ion.Datum, opts Flag) error

func (*Descriptor) Encode

func (d *Descriptor) Encode(buf *ion.Buffer, st *ion.Symtab)

type DirFS

type DirFS struct {
	fs.FS
	Root        string
	Log         func(f string, args ...interface{})
	MinPartSize int
}

DirFS is an InputFS and UploadFS that is rooted in a particular directory.

func NewDirFS

func NewDirFS(dir string) *DirFS

NewDirFS creates a new DirFS in dir.

func (*DirFS) Create

func (d *DirFS) Create(fullpath string) (Uploader, error)

Create implements UploadFS.Create

func (*DirFS) ETag

func (d *DirFS) ETag(fullpath string, info fs.FileInfo) (string, error)

ETag implements InputFS.ETag

func (*DirFS) Mmap

func (d *DirFS) Mmap(fullpath string) ([]byte, error)

Mmap maps the file given by [fullpath]. The caller must ensure that the returned slice, if non-nil, is unmapped with [Unmap]. NOTE: Mmap is not supported on all platforms. The caller should be prepared to handle the error and fall back to ordinary [Open] and [Read] calls.

func (*DirFS) Prefix

func (d *DirFS) Prefix() string

Prefix implements InputFS.Prefix

func (*DirFS) Remove

func (d *DirFS) Remove(fullpath string) error

Remove removes the file at the specified path.

func (*DirFS) Unmap

func (d *DirFS) Unmap(buf []byte) error

Unmap unmaps a buffer returned by [Mmap].

func (*DirFS) WriteFile

func (d *DirFS) WriteFile(fullpath string, buf []byte) (string, error)

WriteFile implements UploadFS.WriteFile

type FileTree

type FileTree struct {

	// Backing is the backing store for the tree.
	// Backing must be set to perform tree operations.
	Backing UploadFS
	// contains filtered or unexported fields
}

FileTree is a B+-tree that holds a list of (path, etag, id) triples in sorted order by path.

func (*FileTree) Append

func (f *FileTree) Append(path, etag string, id int) (bool, error)

Append assigns an ID to a path and etag. Append returns (true, nil) if the (path, etag) tuple is inserted, or (false, nil) if the (path, etag) tuple has already been inserted. A tuple may be inserted under the following conditions:

  1. The path has never been appended before.
  2. The (path, etag) pair has been inserted with an id >= 0, and it is being marked as failed by being re-inserted with an id < 0.
  3. The same path but a different etag has been marked as failed (with id < 0), and Append is overwriting the previous entry with a new etag and id >= 0.

Otherwise, if there exists a (path, etag) tuple with a matching path but non-matching etag, then (false, ErrETagChanged) is returned.

func (*FileTree) EachFile

func (f *FileTree) EachFile(fn func(filename string)) error

EachFile iterates the backing of f and calls fn for each file that is pointed to by the FileTree (i.e. files that contain either inner nodes or leaf nodes that still constitute part of the tree state).

func (*FileTree) Prefetch

func (f *FileTree) Prefetch(input []Input)

Prefetch takes a list of inputs and prefetches inner nodes or leaves that are likely to be associated with an insert operation on f. Currently, Prefetch only fetches "down" one level of the tree structure.

func (*FileTree) Reset

func (f *FileTree) Reset()

Reset resets the contents of the tree

func (*FileTree) Walk

func (f *FileTree) Walk(start string, walk func(name, etag string, id int) bool) error

Walk performs an in-order walk of the filetree starting at the first item greater than or equal to start. The walk function is called on each item in turn until it returns false or until an I/O error is encountered.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter represents a compiled condition that can be evaluated against a SparseIndex to produce intervals that match the compiled condition.

func (*Filter) Compile

func (f *Filter) Compile(e expr.Node)

Compile sets the expression that the filter should evaluate. A call to Compile erases any previously-compiled expression.

func (*Filter) Intervals

func (f *Filter) Intervals(si *SparseIndex) ints.Intervals

Intervals returns the intervals within [si] that match the filter.

func (*Filter) MatchesAny

func (f *Filter) MatchesAny(si *SparseIndex) bool

MatchesAny returns true if f matches any non-empty intervals in si, or false otherwise.

func (*Filter) Overlaps

func (f *Filter) Overlaps(si *SparseIndex, start, end int) bool

Overlaps returns whether or not the sparse index matches the predicate within the half-open interval [start, end)

The behavior of Overlaps when start >= end is unspecified.

func (*Filter) Trivial

func (f *Filter) Trivial() bool

Trivial returns true if the compiled filter condition will never select non-trivial subranges of the input slice in Visit. (In other words, a trivial filter will always visit all the blocks in a sparse index.)

func (*Filter) Visit

func (f *Filter) Visit(si *SparseIndex, interval func(start, end int))

Visit visits distinct (non-overlapping) intervals within si that correspond to the compiled filter.

When no range within si matches the filter, interval will be called once with (0, 0).

type Flag

type Flag int

Flag is an option flag to be passed to DecodeIndex.

const (
	// FlagSkipInputs skips Index.Contents.Inputs
	// when decoding the index. The Inputs list
	// does not need to be read when running queries.
	FlagSkipInputs Flag = 1 << iota
)

type Index

type Index struct {
	// Name is the name of the index.
	Name string
	// Created is the time the index
	// was populated.
	Created date.Time
	// UserData is an arbitrary datum that can be
	// stored with the index and used externally.
	UserData ion.Datum
	// Algo is the compression algorithm used to
	// compress the index contents.
	Algo string

	// Inline is a list of object descriptors
	// that are inlined into the index object.
	//
	// Typically, Inline contains the objects
	// that have been ingested most recently
	// (or are otherwise known to be more likely
	// to be referenced).
	Inline []Descriptor

	// Indirect is the tree that contains
	// all the object descriptors that aren't
	// part of Inline.
	Indirect IndirectTree

	// Inputs is the collection of
	// objects that comprise Inline and Indirect.
	Inputs FileTree

	// ToDelete is a list of items
	// that are no longer referenced
	// by the Index except to indicate
	// that they should be deleted after
	// they have been unreferenced for
	// some period of time.
	ToDelete []Quarantined

	// LastScan is the time at which
	// the last scan operation completed.
	// This may be the zero time if no
	// scan has ever been performed.
	LastScan date.Time
	// Cursors is the list of scanning cursors.
	// These may not be present if no scan
	// has ever been performed.
	Cursors []string
	// Scanning indicates that scanning has
	// not yet completed.
	Scanning bool
}

Index is a collection of formatted objects with a name.

Index objects are stored as MAC'd blobs in order to make it possible to detect tampering of the Contents of the index. (The modtime and ETag of the Contents are part of the signed payload, so we can refuse to operate on those objects if they fail to match the expected modtime and ETag.)

func DecodeIndex

func DecodeIndex(key *Key, index []byte, opts Flag) (*Index, error)

DecodeIndex decodes a signed index (see Sign) and returns the Index, or an error if the index was malformed or the signature doesn't match.

If FlagSkipInputs is passed in opts, this avoids decoding Index.Inputs.

NOTE: the returned Index may contain fields that alias the input slice.

func (*Index) Descs

func (idx *Index) Descs(src InputFS, keep *Filter) ([]Descriptor, []ints.Intervals, int64, error)

Descs collects the list of objects from an index and returns them as a list of descriptors against which queries can be run along with the number of (decompressed) bytes that comprise the returned objects.

If [keep] is non-nil, then blocks which are known to not contain any rows matching [keep] will be excluded from the returned slices.

Note that the returned slices may be empty if the index has no contents.

func (*Index) HasPartition

func (idx *Index) HasPartition(x string) bool

HasPartition returns true if the index can partition descriptors on the top-level field x or false otherwise.

func (*Index) Objects

func (idx *Index) Objects() int

Objects returns the number of packed objects that are pointed to by this Index.

func (*Index) SyncInputs

func (idx *Index) SyncInputs(dir string, expiry time.Duration) error

SyncInputs syncs idx.Inputs to a directory within idx.Inputs.Backing, and queues old input files in idx.ToDelete with the provided expiry relative to the current time. Callers are required to call SyncInputs after updating idx.Inputs.

func (*Index) TimeRange

func (idx *Index) TimeRange(path []string) (min, max date.Time, ok bool)

TimeRange returns the inclusive time range for the given path expression.

type IndexConfig

type IndexConfig struct {
	// MaxInlined is the maximum number of bytes
	// to ingest in a single SyncOutputs operation
	// (not including merging). If MaxInlined is
	// less than or equal to zero, it is ignored
	// and no limit is applied.
	MaxInlined int64
	// TargetSize is the target size of packfiles
	// when compacting.
	TargetSize int64
	// TargetRefSize is the target size of stored
	// indirect references. If this is less than
	// or equal to zero, a default value is used.
	TargetRefSize int64
	// Expiry is the minimum time that a
	// quarantined file should be left around
	// after it has been dereferenced.
	Expiry time.Duration
}

A IndexConfig is a set of configurations for synchronizing an Index.

func (*IndexConfig) Compact

func (c *IndexConfig) Compact(fs UploadFS, lst []Descriptor) ([]Descriptor, []Quarantined, error)

Compact compacts a list of descriptors and returns a new (hopefully shorter) list of descriptors containing the same data along with the list of quarantined descriptor paths that should be deleted.

func (*IndexConfig) SyncOutputs

func (c *IndexConfig) SyncOutputs(idx *Index, ofs UploadFS, dir string) error

SyncOutputs synchronizes idx.Indirect to a directory with the provided UploadFS. SyncOutputs uses c.MaxInlined to determine which (if any) of the leading entries in idx.Inlined should be moved into the indirect tree by trimming leading entries until the decompressed size of the data referenced by idx.Inline is less than or equal to b.MaxInlined.

type IndirectRef

type IndirectRef struct {
	ObjectInfo
	// Objects is the number of
	// object references inside
	// the packed file pointed to by Path.
	Objects int
	// OrigObjects is the number of objects
	// that were compacted to produce the
	// packfiles pointed to by Path.
	OrigObjects int
	// contains filtered or unexported fields
}

IndirectRef references an object that contains a list of descriptors.

type IndirectTree

type IndirectTree struct {
	// Refs is the list of objects containing
	// lists of descriptors, from oldest to newest.
	Refs []IndirectRef

	// Sparse describes the intervals within refs
	// that correspond to particular time ranges.
	Sparse SparseIndex
}

IndirectTree is an ordered list of IndirectRefs.

See IndirectTree.Append for adding descriptors.

func (*IndirectTree) OrigObjects

func (i *IndirectTree) OrigObjects() int

OrigObjects returns the total number of objects that have been flushed to the indirect tree.

func (*IndirectTree) Purge

func (i *IndirectTree) Purge(ifs InputFS, keep *Filter, expiry time.Duration) ([]Quarantined, error)

Purge purges entries from the tree that do not satisfy the filter condition specified in [keep]. Entries to be deleted are returned as quarantined path entries.

Purge may elect to purge less than the maximum amount of data possible to purge if it would be prohibitively expensive to produce the quarantine list.

func (*IndirectTree) Search

func (i *IndirectTree) Search(ifs InputFS, filt *Filter) ([]Descriptor, error)

Search traverses the IndirectTree through the backing store (ifs) to produce the list of blobs that match the given predicate.

type Input

type Input struct {
	// Path and ETag are used to
	// populate the ObjectInfo
	// in an Index built from a Converter.
	Path, ETag string
	// Size is the size of the input, in bytes
	Size int64
	// R is the source of unformatted data
	R io.ReadCloser
	// F is the formatter that produces output blocks
	F RowFormat
	// Err is an error specific
	// to this input that is populated
	// by Converter.Run.
	Err error
}

Input is a combination of an input stream and a row-formatting function. Together they produce output blocks.

func CollectGlob

func CollectGlob(ifs InputFS, fallback func(string) RowFormat, pattern string) ([]Input, error)

CollectGlob turns a glob pattern into a list of Inputs, using fallback as the constructor for the RowFormat of each input object when the object suffix does not match any of the known format suffixes. If any of the files that match the glob pattern do not have known file suffixes and fallback does not return a non-nil RowFormat for those files, then CollectGlob will return an error indicating that the format for the file could not be determined.

type InputFS

type InputFS interface {
	fs.FS
	fsutil.ETagFS

	// Prefix should return a string
	// that is prepended to filesystem
	// paths to indicate the filesystem "origin."
	//
	// For example, an S3 bucket FS would have
	//   s3://bucket/
	// as its prefix.
	Prefix() string
}

InputFS describes the FS implementation that is required for reading inputs.

type Key

type Key [KeyLength]byte

Key is a shared secret key used to sign encoded Indexes.

type MultiWriter

type MultiWriter struct {
	// Output is the Uploader used to
	// upload parts to backing storage.
	Output Uploader
	// Algo is the compression algorithm
	// used to compress blocks.
	Algo string
	// InputAlign is the expected size
	// of input blocks that are provided
	// to io.Write in each stream.
	InputAlign int
	// TargetSize is the target size of
	// each output part written to backing
	// storage.
	TargetSize int

	// MinChunksPerBlock is the desired
	// number of chunks per block.
	// If it is set, then metadata blocks
	// are coalesced so that they are at
	// least this size (unless the total
	// number of chunks is less than
	// MinChunksPerBlock).
	MinChunksPerBlock int

	// Trailer is the trailer that
	// is appended to the output stream.
	// The fields in Trailer are only
	// valid once MultiWriter.Close has
	// been called.
	Trailer
	// contains filtered or unexported fields
}

MultiWriter is a multi-stream writer that turns multiple streams of input blocks into a single output stream of compressed blocks.

MultiWriter tries to keep blocks written by each stream close together in the output so that sparse indexes that are built on each of the streams end up pointing to contiguous regions of output.

func (*MultiWriter) Close

func (m *MultiWriter) Close() error

Close closes the MultiWriter. Close is only safe to call once Close has been called on each outstanding output stream created by calling Open.

Once Close returns, the Trailer field will be populated with the trailer that was assembled from all the constituent spans of input data.

func (*MultiWriter) Open

func (m *MultiWriter) Open() (io.WriteCloser, error)

Open opens a stream for writing output. Each call to io.Write on the provided io.WriteCloser must be m.InputAlign bytes. The stream must be closed before the MultiWriter can be closed. Blocks written to a single stream are coalesced into large, contiguous "spans" of blocks in order to reduce index fragmentation caused by writing multiple streams of output parts simultaneously.

func (*MultiWriter) SkipChecks

func (m *MultiWriter) SkipChecks()

SkipChecks disable some runtime checks of the input data, which is ordinarily expected to be ion data. Do not use this except for testing.

type ObjectInfo

type ObjectInfo struct {
	// Path is the path to the
	// object. See fs.ValidPath
	// for a description of what constitutes
	// a valid path.
	//
	// ETag is the ETag of the object.
	// The ETag is opaque to the blockfmt
	// implementation.
	Path, ETag string
	// LastModified is the mtime of
	// the object. We use both the ETag
	// and the mtime to determine whether
	// an object has been modified since
	// we last looked at it.
	LastModified date.Time
	// Format specifies the format
	// of the object. For output
	// objects, the format indicates
	// the blockfmt version used to
	// write the ion object. For input
	// objects, the format describes the
	// conversion algorithm suffix used
	// to convert the object (see SuffixToFormat).
	Format string

	// Size, if non-zero, is the size of
	// the object. (Output objects are never 0 bytes.)
	Size int64
}

ObjectInfo is a collection of information about an object.

type Quarantined

type Quarantined struct {
	Expiry date.Time
	Path   string
}

Quarantined is an item that is queued for GC but has not yet been deleted.

type Range

type Range interface {
	Path() []string
	Min() ion.Datum
	Max() ion.Datum
}

Range describes the (closed) interval that the value of a particular path expression could occupy

func NewRange

func NewRange(path []string, min, max ion.Datum) Range

type RowFormat

type RowFormat interface {
	// Convert should read data from r and write
	// rows into dst. For each row written to dst,
	// the provided list of constants should also be inserted.
	Convert(r io.Reader, dst *ion.Chunker, constants []ion.Field) error
	// Name is the name of the format
	// that will be included in an index description.
	Name() string
}

RowFormat is the interface through which input streams are converted into aligned output blocks.

func MustSuffixToFormat

func MustSuffixToFormat(suffix string) RowFormat

func UnsafeION

func UnsafeION() RowFormat

UnsafeION converts raw ion by decoding and re-encoding it.

NOTE: UnsafeION is called UnsafeION because the ion package has not been hardened against arbitrary user input. FIXME: harden the ion package against malicious input and then rename this to something else.

type S3FS

type S3FS struct {
	s3.BucketFS
}

S3FS implements UploadFS and InputFS.

func (*S3FS) Create

func (s *S3FS) Create(path string) (Uploader, error)

Create implements UploadFS.Create

func (*S3FS) ETag

func (s *S3FS) ETag(fullpath string, f fs.FileInfo) (string, error)

ETag implements InputFS.ETag

func (*S3FS) Prefix

func (s *S3FS) Prefix() string

Prefix implements InputFS.Prefix

func (*S3FS) WriteFile

func (s *S3FS) WriteFile(path string, contents []byte) (string, error)

WriteFile implements UploadFS.WriteFile

type SparseIndex

type SparseIndex struct {
	// contains filtered or unexported fields
}

func (*SparseIndex) Append

func (s *SparseIndex) Append(next *SparseIndex) bool

Append tries to append next to s and returns true if the append operation was successful, or false otherwise. (Append will fail if the set of indices tracked in each SparseIndex is not the same.) The block positions in next are assumed to start at s.Blocks().

func (*SparseIndex) AppendBlocks

func (s *SparseIndex) AppendBlocks(next *SparseIndex, i, j int) bool

AppendBlocks is like Append, but only appends blocks from next from block i up to block j.

This will panic if i > j or j > next.Blocks().

func (*SparseIndex) Blocks

func (s *SparseIndex) Blocks() int

func (*SparseIndex) Clone

func (s *SparseIndex) Clone() SparseIndex

Clone produces a deep copy of s.

func (*SparseIndex) Const

func (s *SparseIndex) Const(x string) (ion.Datum, bool)

Const extracts the datum associated with the constant x from the sparse index, or returns (ion.Empty, false) if no such datum exists.

func (*SparseIndex) Encode

func (s *SparseIndex) Encode(dst *ion.Buffer, st *ion.Symtab)

func (*SparseIndex) FieldNames

func (s *SparseIndex) FieldNames() []string

FieldNames returns the list of field names using '.' as a separator between the path components. NOTE: FieldNames does not escape the '.' character inside field names themselves, so the textual result of each field name may be ambiguous.

func (*SparseIndex) Fields

func (s *SparseIndex) Fields() int

Fields returns the number of individually indexed fields.

func (*SparseIndex) Get

func (s *SparseIndex) Get(path []string) *TimeIndex

Get gets a TimeIndex associated with a path. The returned TimeIndex may be nil if no such index exists.

func (*SparseIndex) MinMax

func (s *SparseIndex) MinMax(path []string) (min, max date.Time, ok bool)

func (*SparseIndex) Push

func (s *SparseIndex) Push(rng []Range)

func (*SparseIndex) Slice

func (s *SparseIndex) Slice(i, j int) SparseIndex

Slice produces a sparse index for just the blocks in the half-open interval [i:j]. Slice will panic if i is greater than j, i is less than zero, or j is greater than the number of blocks in the index.

func (*SparseIndex) Trim

func (s *SparseIndex) Trim(j int) SparseIndex

Trim produces a copy of s that only includes information up to block j. Trim will panic if j is greater than s.Blocks().

Trim is equivalent to s.Slice(0, j)

type Template

type Template struct {
	Field string // Field is the name of the field to be generated.
	// Eval should generate an ion datum
	// from the input object.
	Eval func(in *Input) (ion.Datum, error)
}

Template is a templated constant field.

type TimeIndex

type TimeIndex struct {
	// contains filtered or unexported fields
}

TimeIndex maintains a lossy mapping of time ranges to "blocks", where the time range -> block mapping is preserved precisely if the ranges are monotonic with respect to the block number. TimeIndex does not care about what constitutes a "block"; it merely maintains a linear mapping from timestamps to integers.

TimeIndex can answer leftmost- and rightmost-bound queries for timestamp values with respect to the range of values inserted via Push.

Because TimeIndex stores a monotonic list of time ranges and blocks, its serialized encoding is space-efficient, as the timestamps and block numbers can be delta-encoded.

See TimeIndex.Push, TimeIndex.Start, and TimeIndex.End

func (*TimeIndex) Append

func (t *TimeIndex) Append(next *TimeIndex)

Append concatenates t and next so that the ranges indexed by next occur immediately after the ranges indexed by t.

func (*TimeIndex) Blocks

func (t *TimeIndex) Blocks() int

Blocks returns the number of blocks in the index.

func (*TimeIndex) Clone

func (t *TimeIndex) Clone() TimeIndex

Clone produces a deep copy of t.

func (*TimeIndex) Contains

func (t *TimeIndex) Contains(when date.Time) bool

Contains returns true if the value 'when' could appear within this index, or false otherwise. Note that Contains is sensitive to holes in the index.

func (*TimeIndex) EditLatest

func (t *TimeIndex) EditLatest(min, max date.Time)

EditLatest extends the range associated with the most recent call to Push. (EditLatest has no effect if (min, max) are no less/greater than the previous (min/max) pair.)

func (*TimeIndex) Encode

func (t *TimeIndex) Encode(dst *ion.Buffer, st *ion.Symtab)

func (*TimeIndex) End

func (t *TimeIndex) End(when date.Time) int

End produces the highest offset (exclusive) at which the time 'when' could occur in the input block list. In other words, for a return value N, blocks [0, N) could contain the value "when".

func (*TimeIndex) EndIntervals

func (t *TimeIndex) EndIntervals() int

EndIntervals returns the number of distinct values that t.End could return.

func (*TimeIndex) Max

func (t *TimeIndex) Max() (date.Time, bool)

func (*TimeIndex) Min

func (t *TimeIndex) Min() (date.Time, bool)

func (*TimeIndex) Push

func (t *TimeIndex) Push(start, end date.Time)

Push pushes one new block to the index with the associated start and end times.

If the time range specified in Push overlaps with block ranges that are already part of the index, those ranges will be coalesced into the union of the two ranges. In other words, overlapping ranges (or non-monotonic inserts more generally) will cause the precision of the TimeIndex mapping to relax until it can guarantee that it can maintain a monotonic time-to-block mapping. In the most degenerate case, the TimeIndex will simply map the minimum seen time to block 0 and maximum seen time to block N.

func (*TimeIndex) PushEmpty

func (t *TimeIndex) PushEmpty(num int)

PushEmpty pushes num empty blocks to the index. The index should have more than zero entries already present (i.e. Push should have been called at least once).

func (*TimeIndex) Reset

func (t *TimeIndex) Reset()

Reset removes all the values from t.

func (*TimeIndex) Start

func (t *TimeIndex) Start(when date.Time) int

Start produces the lowest offset (inclusive) at which the time 'when' could occur in the input block list.

func (*TimeIndex) StartIntervals

func (t *TimeIndex) StartIntervals() int

StartIntervals returns the number of distinct values that t.Start could return.

func (*TimeIndex) String

func (t *TimeIndex) String() string

String implements fmt.Stringer

type TimeRange

type TimeRange struct {
	// contains filtered or unexported fields
}

func (*TimeRange) Max

func (r *TimeRange) Max() ion.Datum

func (*TimeRange) MaxTime

func (r *TimeRange) MaxTime() date.Time

func (*TimeRange) Min

func (r *TimeRange) Min() ion.Datum

func (*TimeRange) MinTime

func (r *TimeRange) MinTime() date.Time

func (*TimeRange) Path

func (r *TimeRange) Path() []string

func (*TimeRange) Union

func (r *TimeRange) Union(t *TimeRange)

type Trailer

type Trailer struct {
	// Version is an indicator
	// of the encoded trailer format version
	Version int
	// Offset is the offset of the trailer
	// within the output stream.
	Offset int64
	// Algo is the name of the compression
	// algorithm used to compress blocks
	Algo string
	// BlockShift is the alignment of each block
	// when it is fully decompressed (in bits)
	//
	// For example, BlockShift of 20 means that
	// blocks are 1MB (1 << 20) bytes each.
	BlockShift int
	// Blocks is the list of descriptors
	// for each block.
	Blocks []Blockdesc
	// Sparse contains a lossy secondary index
	// of timestamp ranges and constant fields
	// within Blocks.
	Sparse SparseIndex
}

Trailer is a collection of block descriptions.

func ReadTrailer

func ReadTrailer(src io.ReaderAt, size int64) (*Trailer, error)

ReadTrailer reads a trailer from an io.ReaderAt that has a backing size of 'size'.

func (*Trailer) BlockRange

func (t *Trailer) BlockRange(i int) (start, end int64)

BlockRange returns the start and end offsets of block [i] within the object.

func (*Trailer) BlockSize

func (t *Trailer) BlockSize(i int) int64

BlockSize returns the compressed size of block [i] within the object.

func (*Trailer) Decode

func (t *Trailer) Decode(st *ion.Symtab, body []byte) error

Decode decodes a trailer encoded using Encode.

func (*Trailer) Decompressed

func (t *Trailer) Decompressed() int64

Decompressed returns the decompressed size of all of the data within the trailer blocks.

func (*Trailer) DecompressedSize

func (t *Trailer) DecompressedSize(i int) int64

DecompressedSize returns the decompressed size of block [i] within the object.

func (*Trailer) Encode

func (t *Trailer) Encode(dst *ion.Buffer, st *ion.Symtab)

Encode encodes a trailer to the provided buffer using the provided symbol table. Note that Encode may add new symbols to the symbol table.

func (*Trailer) ReadFrom

func (t *Trailer) ReadFrom(src io.ReaderAt, size int64) error

type TrailerDecoder

type TrailerDecoder struct {
	// contains filtered or unexported fields
}

A TrailerDecoder can be used to decode multiple trailers containing related information in a more memory-efficient way than decoding the trailers individually.

func (*TrailerDecoder) Decode

func (d *TrailerDecoder) Decode(v ion.Datum, dst *Trailer) error

Decode decodes a trailer.

type UploadFS

type UploadFS interface {
	InputFS

	// WriteFile should create the
	// file at path with the given contents.
	// If the file already exists, it should
	// be overwritten atomically.
	// WriteFile should return the ETag associated
	// with the written file along with the first encountered error.
	WriteFile(path string, buf []byte) (etag string, err error)

	// Create should create an Uploader
	// for the given path. The file should
	// not be visible at the provided path
	// until the Uploader has been closed
	// successfully.
	Create(path string) (Uploader, error)
}

UploadFS describes the FS implementation that is required for writing outputs.

type Uploader

type Uploader interface {
	// MinPartSize is the minimum supported
	// part size for the Uploader.
	MinPartSize() int
	// Upload should upload contents
	// as the given part number.
	// Part numbers may be sparse, but
	// they will always be positive and non-zero.
	// Upload is not required to handle
	// len(contents) < MinPartSize().
	Upload(part int64, contents []byte) error
	// Close should append final to the
	// object contents and then finalize
	// the object. Close must handle
	// len(final) < MinPartSize().
	Close(final []byte) error
	// Size should return the final size
	// of the uploaded object. It is only
	// required to return a valid value
	// after Close has been called.
	Size() int64
}

Uploader describes what we expect an object store upload API to look like.

(Take a look at aws/s3.Uploader.)

type ZionWriter

type ZionWriter interface {
	// ConfigureZion is called with the set of
	// top-level path components that the caller
	// expects the callee to handle. ConfigureZion
	// should return true if the callee can handle
	// extracting the provided field list directly
	// from encoded zion data, or false if it cannot.
	ConfigureZion(blocksize int64, fields []string) bool
}

ZionWriter is an optional interface implemented by an io.Writer passed to Decoder.CopyBytes or Decoder.Copy. An io.Writer that implements ZionWriter may receive raw zion-encoded data rather than decompressed ion data.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL