Documentation ¶
Overview ¶
Package fileset provides access to files through file sets.
A file set is the basic unit of storage for files. File sets come in two types: primitive and composite. A primitive file set stores a list of files added and a list of files deleted. A composite file set composes multiple composite and / or primitive file sets into layers which can be merged to produce a new file set. Composite file sets allow file operations to be applied lazily, which can be very beneficial for improving write / storage costs.
File sets are ephemeral by default, so unreferenced file sets will eventually be garbage collected. Processing that involves creating and using file sets should be done inside a renewer to keep the file sets alive while the processing is occurring.
File sets can be created with a file set writer and files can be retrieved by opening a file set with optional indexing and filtering, then iterating through the files. Files must be added to a file set in lexicographical order and files are emitted by a file set in lexicographical order. Various file set wrappers are available that can perform computations and provide filtering based on the files emitted by a file set.
Backwards compatibility:
The algorithms for file and index chunking have changed throughout 2.x, and we must support previously written data. Here are some examples of conditions in past data which current code will not generate:
- a file that is split across multiple chunks may share some of them with other files (in the current code, a file split across chunks will be the only file in those chunks).
- related, even small files may be split across multiple chunks
- an index range data reference may start part way through a chunk
Index ¶
- Constants
- Variables
- func Clean(p string, isDir bool) string
- func CopyDeletedFiles(ctx context.Context, w *Writer, fs FileSet, opts ...index.Option) error
- func CopyFiles(ctx context.Context, w *Writer, fs FileSet, opts ...index.Option) error
- func CreatePostgresCacheV1(ctx context.Context, tx *pachsql.Tx) error
- func IDsToHexStrings(ids []ID) []string
- func IsClean(x string, isDir bool) bool
- func IsDir(p string) bool
- func SetupPostgresStoreV0(ctx context.Context, tx *pachsql.Tx) error
- func SizeFromIndex(idx *index.Index) (size int64)
- func StoreTestSuite(t *testing.T, newStore func(t testing.TB) MetadataStore)
- func WriteTarEntry(ctx context.Context, w io.Writer, f File) error
- func WriteTarStream(ctx context.Context, w io.Writer, fs FileSet) error
- type Buffer
- func (b *Buffer) Add(path, datum string) io.Writer
- func (b *Buffer) Copy(file File, datum string)
- func (b *Buffer) Count() int
- func (b *Buffer) Delete(path, datum string)
- func (b *Buffer) Empty() bool
- func (b *Buffer) WalkAdditive(onAdd func(path, datum string, r io.Reader) error, ...) error
- func (b *Buffer) WalkDeletive(cb func(path, datum string) error) error
- type Cache
- type CompactCallback
- type CompactionConfig
- type Composite
- func (*Composite) Descriptor() ([]byte, []int)
- func (m *Composite) GetLayers() []string
- func (m *Composite) Marshal() (dAtA []byte, err error)
- func (x *Composite) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *Composite) MarshalTo(dAtA []byte) (int, error)
- func (m *Composite) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (c *Composite) PointsTo() ([]ID, error)
- func (*Composite) ProtoMessage()
- func (m *Composite) Reset()
- func (m *Composite) Size() (n int)
- func (m *Composite) String() string
- func (m *Composite) Unmarshal(dAtA []byte) error
- func (m *Composite) XXX_DiscardUnknown()
- func (m *Composite) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Composite) XXX_Merge(src proto.Message)
- func (m *Composite) XXX_Size() int
- func (m *Composite) XXX_Unmarshal(b []byte) error
- type File
- type FileReader
- type FileSet
- type ID
- type Iterator
- type MergeFileReader
- type MergeReader
- func (mr *MergeReader) Iterate(ctx context.Context, cb func(File) error, opts ...index.Option) error
- func (mr *MergeReader) IterateDeletes(ctx context.Context, cb func(File) error, opts ...index.Option) error
- func (mr *MergeReader) Shards(ctx context.Context, opts ...index.Option) ([]*index.PathRange, error)
- type Metadata
- func (*Metadata) Descriptor() ([]byte, []int)
- func (m *Metadata) GetComposite() *Composite
- func (m *Metadata) GetPrimitive() *Primitive
- func (m *Metadata) GetValue() isMetadata_Value
- func (m *Metadata) Marshal() (dAtA []byte, err error)
- func (x *Metadata) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *Metadata) MarshalTo(dAtA []byte) (int, error)
- func (m *Metadata) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Metadata) ProtoMessage()
- func (m *Metadata) Reset()
- func (m *Metadata) Size() (n int)
- func (m *Metadata) String() string
- func (m *Metadata) Unmarshal(dAtA []byte) error
- func (m *Metadata) XXX_DiscardUnknown()
- func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Metadata) XXX_Merge(src proto.Message)
- func (*Metadata) XXX_OneofWrappers() []interface{}
- func (m *Metadata) XXX_Size() int
- func (m *Metadata) XXX_Unmarshal(b []byte) error
- type MetadataStore
- type Metadata_Composite
- type Metadata_Primitive
- type Primitive
- func (*Primitive) Descriptor() ([]byte, []int)
- func (m *Primitive) GetAdditive() *index.Index
- func (m *Primitive) GetDeletive() *index.Index
- func (m *Primitive) GetSizeBytes() int64
- func (m *Primitive) Marshal() (dAtA []byte, err error)
- func (x *Primitive) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *Primitive) MarshalTo(dAtA []byte) (int, error)
- func (m *Primitive) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (p *Primitive) PointsTo() []chunk.ID
- func (*Primitive) ProtoMessage()
- func (m *Primitive) Reset()
- func (m *Primitive) Size() (n int)
- func (m *Primitive) String() string
- func (m *Primitive) Unmarshal(dAtA []byte) error
- func (m *Primitive) XXX_DiscardUnknown()
- func (m *Primitive) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Primitive) XXX_Merge(src proto.Message)
- func (m *Primitive) XXX_Size() int
- func (m *Primitive) XXX_Unmarshal(b []byte) error
- type Reader
- type Renewer
- type Storage
- func (s *Storage) ChunkStorage() *chunk.Storage
- func (s *Storage) CloneTx(tx *pachsql.Tx, id ID, ttl time.Duration) (*ID, error)
- func (s *Storage) Compact(ctx context.Context, ids []ID, ttl time.Duration, opts ...index.Option) (*ID, error)
- func (s *Storage) CompactLevelBased(ctx context.Context, ids []ID, maxFanIn int, ttl time.Duration, ...) (*ID, error)
- func (s *Storage) Compose(ctx context.Context, ids []ID, ttl time.Duration) (*ID, error)
- func (s *Storage) ComposeTx(tx *pachsql.Tx, ids []ID, ttl time.Duration) (*ID, error)
- func (s *Storage) Concat(ctx context.Context, ids []ID, ttl time.Duration) (*ID, error)
- func (s *Storage) Drop(ctx context.Context, id ID) error
- func (s *Storage) Flatten(ctx context.Context, ids []ID, cb func(id ID) error) error
- func (s *Storage) FlattenAll(ctx context.Context, ids []ID) ([]ID, error)
- func (s *Storage) IsCompacted(ctx context.Context, id ID) (bool, error)
- func (s *Storage) NewGC(d time.Duration) *track.GarbageCollector
- func (s *Storage) NewUnorderedWriter(ctx context.Context, opts ...UnorderedWriterOption) (*UnorderedWriter, error)
- func (s *Storage) NewWriter(ctx context.Context, opts ...WriterOption) *Writer
- func (s *Storage) Open(ctx context.Context, ids []ID) (FileSet, error)
- func (s *Storage) SetTTL(ctx context.Context, id ID, ttl time.Duration) (time.Time, error)
- func (s *Storage) Shard(ctx context.Context, ids []ID, pathRange *index.PathRange) ([]*index.PathRange, error)
- func (s *Storage) ShardConfig() *index.ShardConfig
- func (s *Storage) Size(ctx context.Context, id ID) (int64, error)
- func (s *Storage) SizeUpperBound(ctx context.Context, id ID) (int64, error)
- func (s *Storage) WithRenewer(ctx context.Context, ttl time.Duration, ...) (retErr error)
- type StorageOption
- func StorageOptions(conf *serviceenv.StorageConfiguration) []StorageOption
- func WithLevelFactor(x int64) StorageOption
- func WithMaxOpenFileSets(max int) StorageOption
- func WithMemoryThreshold(threshold int64) StorageOption
- func WithShardCountThreshold(threshold int64) StorageOption
- func WithShardSizeThreshold(threshold int64) StorageOption
- type TestCacheValue
- func (*TestCacheValue) Descriptor() ([]byte, []int)
- func (m *TestCacheValue) GetFileSetId() string
- func (m *TestCacheValue) Marshal() (dAtA []byte, err error)
- func (x *TestCacheValue) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *TestCacheValue) MarshalTo(dAtA []byte) (int, error)
- func (m *TestCacheValue) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*TestCacheValue) ProtoMessage()
- func (m *TestCacheValue) Reset()
- func (m *TestCacheValue) Size() (n int)
- func (m *TestCacheValue) String() string
- func (m *TestCacheValue) Unmarshal(dAtA []byte) error
- func (m *TestCacheValue) XXX_DiscardUnknown()
- func (m *TestCacheValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TestCacheValue) XXX_Merge(src proto.Message)
- func (m *TestCacheValue) XXX_Size() int
- func (m *TestCacheValue) XXX_Unmarshal(b []byte) error
- type UnorderedWriter
- func (uw *UnorderedWriter) AddFileSet(ctx context.Context, id ID) error
- func (uw *UnorderedWriter) Close(ctx context.Context) (*ID, error)
- func (uw *UnorderedWriter) Copy(ctx context.Context, fs FileSet, datum string, appendFile bool, ...) error
- func (uw *UnorderedWriter) Delete(ctx context.Context, p, datum string) error
- func (uw *UnorderedWriter) Put(ctx context.Context, p, datum string, appendFile bool, r io.Reader) (retErr error)
- type UnorderedWriterOption
- type Writer
- type WriterOption
Constants ¶
const ( // DefaultMemoryThreshold is the default for the memory threshold that must // be met before a file set part is serialized (excluding close). DefaultMemoryThreshold = units.GB // DefaultCompactionLevelFactor is the default factor that level sizes increase by in a compacted fileset. DefaultCompactionLevelFactor = 10 DefaultPrefetchLimit = 10 DefaultBatchThreshold = units.MB // DefaultIndexCacheSize is the default size of the index cache. DefaultIndexCacheSize = 100 // TrackerPrefix is used for creating tracker objects for filesets TrackerPrefix = "fileset/" // DefaultFileDatum is the default file datum. DefaultFileDatum = "default" )
const CacheTrackerPrefix = "cache/"
Variables ¶
var ( ErrInvalidLengthFileset = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowFileset = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupFileset = fmt.Errorf("proto: unexpected end of group") )
var ( // ErrFileSetExists path already exists ErrFileSetExists = fmt.Errorf("path already exists") // ErrFileSetNotExists path does not exist ErrFileSetNotExists = fmt.Errorf("path does not exist") // ErrNoTTLSet no ttl set on path ErrNoTTLSet = fmt.Errorf("no ttl set on path") )
var ( // ErrNoFileSetFound is returned by the methods on Storage when a fileset does not exist ErrNoFileSetFound = errors.Errorf("no fileset found") )
Functions ¶
func CopyDeletedFiles ¶
CopyDeletedFiles copies the deleted files from a file set to a file set writer.
func CreatePostgresCacheV1 ¶
CreatePostgresCacheV1 creates the table for a cache. DO NOT MODIFY THIS FUNCTION IT HAS BEEN USED IN A RELEASED MIGRATION
func IDsToHexStrings ¶
func SetupPostgresStoreV0 ¶
SetupPostgresStoreV0 sets up the tables for a Store DO NOT MODIFY THIS FUNCTION IT HAS BEEN USED IN A RELEASED MIGRATION
func SizeFromIndex ¶
func StoreTestSuite ¶
func StoreTestSuite(t *testing.T, newStore func(t testing.TB) MetadataStore)
StoreTestSuite is a suite of tests for a Store.
func WriteTarEntry ¶
WriteTarEntry writes an tar entry for f to w
Types ¶
type Buffer ¶
type Buffer struct {
// contains filtered or unexported fields
}
func (*Buffer) Count ¶
Count gives the number of paths tracked in the buffer, meant as a proxy for metadata memory usage
func (*Buffer) WalkAdditive ¶
type CompactCallback ¶
CompactCallback is the standard callback signature for a compaction operation.
type CompactionConfig ¶
type CompactionConfig struct {
LevelFactor int64
}
type Composite ¶
type Composite struct { Layers []string `protobuf:"bytes,1,rep,name=layers,proto3" json:"layers,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Composite) Descriptor ¶
func (*Composite) MarshalLogObject ¶
func (x *Composite) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Composite) MarshalToSizedBuffer ¶
func (*Composite) PointsTo ¶
PointsTo returns the IDs of the filesets which this composite fileset points to
func (*Composite) ProtoMessage ¶
func (*Composite) ProtoMessage()
func (*Composite) XXX_DiscardUnknown ¶
func (m *Composite) XXX_DiscardUnknown()
func (*Composite) XXX_Marshal ¶
func (*Composite) XXX_Unmarshal ¶
type File ¶
type File interface { // Index returns the index for the file. Index() *index.Index // Content writes the content of the file. Content(ctx context.Context, w io.Writer, opts ...chunk.ReaderOption) error // Hash returns the hash of the file. Hash(ctx context.Context) ([]byte, error) }
File represents a file.
type FileReader ¶
type FileReader struct {
// contains filtered or unexported fields
}
FileReader is an abstraction for reading a file.
func (*FileReader) Content ¶
func (fr *FileReader) Content(ctx context.Context, w io.Writer, opts ...chunk.ReaderOption) error
Content writes the content of the file.
func (*FileReader) Hash ¶
func (fr *FileReader) Hash(_ context.Context) ([]byte, error)
Hash returns the hash of the file.
func (*FileReader) Index ¶
func (fr *FileReader) Index() *index.Index
Index returns the index for the file.
type FileSet ¶
type FileSet interface { // Iterate iterates over the files in the file set. Iterate(ctx context.Context, cb func(File) error, opts ...index.Option) error // IterateDeletes iterates over the deleted files in the file set. IterateDeletes(ctx context.Context, cb func(File) error, opts ...index.Option) error // Shards returns a list of shards for the file set. Shards(ctx context.Context, opts ...index.Option) ([]*index.PathRange, error) }
FileSet represents a set of files.
func NewDirInserter ¶
NewDirInserter creates a file set that inserts directory entries.
func NewIndexFilter ¶
NewIndexFilter filters fs using predicate.
func NewIndexMapper ¶
NewIndexMapper performs a map operation on the index entries of the files in the file set.
func NewPrefetcher ¶
type ID ¶
type ID [16]byte
ID is the unique identifier for a fileset
func HexStringsToIDs ¶
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
Iterator provides functionality for imperative iteration over a file set.
func NewIterator ¶
type MergeFileReader ¶
type MergeFileReader struct {
// contains filtered or unexported fields
}
MergeFileReader is an abstraction for reading a merged file.
func (*MergeFileReader) Content ¶
func (mfr *MergeFileReader) Content(ctx context.Context, w io.Writer, opts ...chunk.ReaderOption) error
Content returns the content of the merged file.
func (*MergeFileReader) Hash ¶
func (mfr *MergeFileReader) Hash(ctx context.Context) ([]byte, error)
Hash returns the hash of the file. TODO: It would be good to remove this potential performance footgun, but it would require removing the append functionality.
func (*MergeFileReader) Index ¶
func (mfr *MergeFileReader) Index() *index.Index
Index returns the index for the merged file. TODO: Removed clone because it had a significant performance impact for small files. May want to revisit.
type MergeReader ¶
type MergeReader struct {
// contains filtered or unexported fields
}
MergeReader is an abstraction for reading merged file sets.
func (*MergeReader) Iterate ¶
func (mr *MergeReader) Iterate(ctx context.Context, cb func(File) error, opts ...index.Option) error
Iterate iterates over the files in the merge reader.
func (*MergeReader) IterateDeletes ¶
type Metadata ¶
type Metadata struct { // Types that are valid to be assigned to Value: // *Metadata_Primitive // *Metadata_Composite Value isMetadata_Value `protobuf_oneof:"value"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Metadata) Descriptor ¶
func (*Metadata) GetComposite ¶
func (*Metadata) GetPrimitive ¶
func (*Metadata) MarshalLogObject ¶
func (x *Metadata) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Metadata) MarshalToSizedBuffer ¶
func (*Metadata) ProtoMessage ¶
func (*Metadata) ProtoMessage()
func (*Metadata) XXX_DiscardUnknown ¶
func (m *Metadata) XXX_DiscardUnknown()
func (*Metadata) XXX_Marshal ¶
func (*Metadata) XXX_OneofWrappers ¶
func (*Metadata) XXX_OneofWrappers() []interface{}
XXX_OneofWrappers is for the internal use of the proto package.
func (*Metadata) XXX_Unmarshal ¶
type MetadataStore ¶
type MetadataStore interface { DB() *pachsql.DB SetTx(tx *pachsql.Tx, id ID, md *Metadata) error Get(ctx context.Context, id ID) (*Metadata, error) GetTx(tx *pachsql.Tx, id ID) (*Metadata, error) DeleteTx(tx *pachsql.Tx, id ID) error Exists(ctx context.Context, id ID) (bool, error) }
MetadataStore stores filesets. A fileset is a path -> index relationship All filesets exist in the same keyspace and can be merged by prefix
func NewPostgresStore ¶
func NewPostgresStore(db *pachsql.DB) MetadataStore
NewPostgresStore returns a Store backed by db TODO: Expose configuration for cache size?
func NewTestStore ¶
NewTestStore returns a Store scoped to the lifetime of the test.
type Metadata_Composite ¶
type Metadata_Composite struct {
Composite *Composite `protobuf:"bytes,2,opt,name=composite,proto3,oneof" json:"composite,omitempty"`
}
func (*Metadata_Composite) MarshalTo ¶
func (m *Metadata_Composite) MarshalTo(dAtA []byte) (int, error)
func (*Metadata_Composite) MarshalToSizedBuffer ¶
func (m *Metadata_Composite) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*Metadata_Composite) Size ¶
func (m *Metadata_Composite) Size() (n int)
type Metadata_Primitive ¶
type Metadata_Primitive struct {
Primitive *Primitive `protobuf:"bytes,1,opt,name=primitive,proto3,oneof" json:"primitive,omitempty"`
}
func (*Metadata_Primitive) MarshalTo ¶
func (m *Metadata_Primitive) MarshalTo(dAtA []byte) (int, error)
func (*Metadata_Primitive) MarshalToSizedBuffer ¶
func (m *Metadata_Primitive) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*Metadata_Primitive) Size ¶
func (m *Metadata_Primitive) Size() (n int)
type Primitive ¶
type Primitive struct { Deletive *index.Index `protobuf:"bytes,1,opt,name=deletive,proto3" json:"deletive,omitempty"` Additive *index.Index `protobuf:"bytes,2,opt,name=additive,proto3" json:"additive,omitempty"` SizeBytes int64 `protobuf:"varint,3,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Primitive) Descriptor ¶
func (*Primitive) GetAdditive ¶
func (*Primitive) GetDeletive ¶
func (*Primitive) GetSizeBytes ¶
func (*Primitive) MarshalLogObject ¶
func (x *Primitive) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Primitive) MarshalToSizedBuffer ¶
func (*Primitive) PointsTo ¶
PointsTo returns a slice of the chunk.IDs which this fileset immediately points to. Transitively reachable chunks are not included in the slice.
func (*Primitive) ProtoMessage ¶
func (*Primitive) ProtoMessage()
func (*Primitive) XXX_DiscardUnknown ¶
func (m *Primitive) XXX_DiscardUnknown()
func (*Primitive) XXX_Marshal ¶
func (*Primitive) XXX_Unmarshal ¶
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader is an abstraction for reading a file set.
func (*Reader) IterateDeletes ¶
type Renewer ¶
type Renewer struct {
// contains filtered or unexported fields
}
Renewer renews file sets by ID It is a renew.StringSet wrapped for type safety
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage is an abstraction for interfacing with file sets. A storage instance: - Provides methods for writing file sets, opening file sets for reading, and managing file sets. - Manages tracker state to keep internal file sets alive while writing file sets. - Manages an internal index cache that supports logarithmic lookup in the multilevel indexes. - Provides methods for processing file set compaction tasks. - Provides a method for creating a garbage collector.
func NewStorage ¶
func NewStorage(mds MetadataStore, tr track.Tracker, chunks *chunk.Storage, opts ...StorageOption) *Storage
NewStorage creates a new Storage.
func NewTestStorage ¶
func NewTestStorage(ctx context.Context, t testing.TB, db *pachsql.DB, tr track.Tracker, opts ...StorageOption) *Storage
NewTestStorage constructs a local storage instance scoped to the lifetime of the test
func (*Storage) ChunkStorage ¶
ChunkStorage returns the underlying chunk storage instance for this storage instance.
func (*Storage) CloneTx ¶
CloneTx creates a new fileset, identical to the fileset at id, but with the specified ttl. The ttl can be ignored by using track.NoTTL
func (*Storage) Compact ¶
func (s *Storage) Compact(ctx context.Context, ids []ID, ttl time.Duration, opts ...index.Option) (*ID, error)
Compact compacts the contents of ids into a new file set with the specified ttl and returns the ID. Compact always returns the ID of a primitive file set. Compact does not renew ids. It is the responsibility of the caller to renew ids. In some cases they may be permanent and not require renewal.
func (*Storage) CompactLevelBased ¶
func (s *Storage) CompactLevelBased(ctx context.Context, ids []ID, maxFanIn int, ttl time.Duration, compact CompactCallback) (*ID, error)
CompactLevelBased performs a level-based compaction on the passed in file sets.
func (*Storage) Compose ¶
Compose produces a composite fileset from the filesets under ids. It does not perform a merge or check that the filesets at ids in any way other than ensuring that they exist.
func (*Storage) ComposeTx ¶
ComposeTx produces a composite fileset from the filesets under ids. It does not perform a merge or check that the filesets at ids in any way other than ensuring that they exist.
func (*Storage) Concat ¶
Concat is a special case of Merge, where the filesets each contain paths for distinct ranges. The path ranges must be non-overlapping and the ranges must be lexigraphically sorted. Concat always returns the ID of a primitive fileset.
func (*Storage) Flatten ¶
Flatten iterates through IDs and replaces references to composite file sets with all their layers in place and executes the user provided callback against each primitive file set.
func (*Storage) FlattenAll ¶
FlattenAll is like Flatten, but collects the primitives to return to the user.
func (*Storage) IsCompacted ¶
IsCompacted returns true if the file sets are already in a compacted form.
func (*Storage) NewUnorderedWriter ¶
func (s *Storage) NewUnorderedWriter(ctx context.Context, opts ...UnorderedWriterOption) (*UnorderedWriter, error)
NewUnorderedWriter creates a new unordered file set writer.
func (*Storage) NewWriter ¶
func (s *Storage) NewWriter(ctx context.Context, opts ...WriterOption) *Writer
NewWriter creates a new file set writer.
func (*Storage) Shard ¶
func (s *Storage) Shard(ctx context.Context, ids []ID, pathRange *index.PathRange) ([]*index.PathRange, error)
Shard shards the file set into path ranges. TODO This should be extended to be more configurable (different criteria for creating shards).
func (*Storage) ShardConfig ¶
func (s *Storage) ShardConfig() *index.ShardConfig
func (*Storage) SizeUpperBound ¶
SizeUpperBound returns an upper bound for the size of the data in the file set in bytes. The upper bound is cheaper to compute than the actual size.
type StorageOption ¶
type StorageOption func(*Storage)
StorageOption configures a storage.
func StorageOptions ¶
func StorageOptions(conf *serviceenv.StorageConfiguration) []StorageOption
StorageOptions returns the fileset storage options for the config.
func WithLevelFactor ¶
func WithLevelFactor(x int64) StorageOption
WithLevelFactor sets the factor which the size of levels in inc
func WithMaxOpenFileSets ¶
func WithMaxOpenFileSets(max int) StorageOption
WithMaxOpenFileSets sets the maximum number of filesets that will be open (potentially buffered in memory) at a time.
func WithMemoryThreshold ¶
func WithMemoryThreshold(threshold int64) StorageOption
WithMemoryThreshold sets the memory threshold that must be met before a file set part is serialized (excluding close).
func WithShardCountThreshold ¶
func WithShardCountThreshold(threshold int64) StorageOption
WithShardCountThreshold sets the count threshold that must be met before a shard is created by the shard function.
func WithShardSizeThreshold ¶
func WithShardSizeThreshold(threshold int64) StorageOption
WithShardSizeThreshold sets the size threshold that must be met before a shard is created by the shard function.
type TestCacheValue ¶
type TestCacheValue struct { FileSetId string `protobuf:"bytes,1,opt,name=file_set_id,json=fileSetId,proto3" json:"file_set_id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*TestCacheValue) Descriptor ¶
func (*TestCacheValue) Descriptor() ([]byte, []int)
func (*TestCacheValue) GetFileSetId ¶
func (m *TestCacheValue) GetFileSetId() string
func (*TestCacheValue) Marshal ¶
func (m *TestCacheValue) Marshal() (dAtA []byte, err error)
func (*TestCacheValue) MarshalLogObject ¶
func (x *TestCacheValue) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*TestCacheValue) MarshalToSizedBuffer ¶
func (m *TestCacheValue) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*TestCacheValue) ProtoMessage ¶
func (*TestCacheValue) ProtoMessage()
func (*TestCacheValue) Reset ¶
func (m *TestCacheValue) Reset()
func (*TestCacheValue) Size ¶
func (m *TestCacheValue) Size() (n int)
func (*TestCacheValue) String ¶
func (m *TestCacheValue) String() string
func (*TestCacheValue) Unmarshal ¶
func (m *TestCacheValue) Unmarshal(dAtA []byte) error
func (*TestCacheValue) XXX_DiscardUnknown ¶
func (m *TestCacheValue) XXX_DiscardUnknown()
func (*TestCacheValue) XXX_Marshal ¶
func (m *TestCacheValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*TestCacheValue) XXX_Merge ¶
func (m *TestCacheValue) XXX_Merge(src proto.Message)
func (*TestCacheValue) XXX_Size ¶
func (m *TestCacheValue) XXX_Size() int
func (*TestCacheValue) XXX_Unmarshal ¶
func (m *TestCacheValue) XXX_Unmarshal(b []byte) error
type UnorderedWriter ¶
type UnorderedWriter struct {
// contains filtered or unexported fields
}
UnorderedWriter allows writing Files, unordered by path, into multiple ordered filesets. This may be a full filesystem or a subfilesystem (e.g. datum / datum set / shard).
func (*UnorderedWriter) AddFileSet ¶
func (uw *UnorderedWriter) AddFileSet(ctx context.Context, id ID) error
func (*UnorderedWriter) Close ¶
func (uw *UnorderedWriter) Close(ctx context.Context) (*ID, error)
Close closes the writer.
type UnorderedWriterOption ¶
type UnorderedWriterOption func(*UnorderedWriter)
UnorderedWriterOption configures an UnorderedWriter.
func WithCompact ¶
func WithCompact(maxFanIn int) UnorderedWriterOption
WithCompact sets the unordered writer to compact the created file sets if they exceed the passed in fan in.
func WithParentID ¶
func WithParentID(getParentID func() (*ID, error)) UnorderedWriterOption
WithParentID sets a factory for the parent fileset ID for the unordered writer. This is used for converting directory deletions into a set of point deletes for the files contained within the directory.
func WithRenewal ¶
func WithRenewal(ttl time.Duration, r *Renewer) UnorderedWriterOption
WithRenewal configures the UnorderedWriter to renew subfileset paths with the provided renewer.
func WithValidator ¶
func WithValidator(validator func(string) error) UnorderedWriterOption
WithValidator sets the validator for paths being written to the unordered writer.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer provides functionality for writing a file set.