chunk

package
v2.7.0-nightly.20230526 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2023 License: Apache-2.0 Imports: 43 Imported by: 0

Documentation

Overview

Package chunk provides access to data through content-addressed chunks.

A chunk is the basic unit of storage for data. Chunks are identified by their content-address, which is a hash of the data stored in the chunk. There are two mechanisms for uploading chunks: uploader and batcher. The uploader is intended for medium / large data entries since it performs content-defined chunking on the data and stores each data entry in its own set of chunks. The batcher is intended for small data entries since it batches multiple data entries into larger chunks. The result of each of these upload methods is a list of data references for each data entry. These data references can be used to access the corresponding data through readers.

Index

Constants

View Source
const (
	DefaultAverageBits  = 23
	DefaultSeed         = 1
	DefaultMinChunkSize = 1 * units.MB
	DefaultMaxChunkSize = 20 * units.MB
)
View Source
const (
	// TrackerPrefix is the prefix used when creating tracker objects for chunks
	TrackerPrefix = "chunk/"

	DefaultPrefetchLimit = 10
)
View Source
const (
	// WindowSize is the size of the rolling hash window.
	WindowSize = 64
)

Variables

View Source
var (
	ErrInvalidLengthChunk        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowChunk          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupChunk = fmt.Errorf("proto: unexpected end of group")
)
View Source
var CompressionAlgo_name = map[int32]string{
	0: "NONE",
	1: "GZIP_BEST_SPEED",
}
View Source
var CompressionAlgo_value = map[string]int32{
	"NONE":            0,
	"GZIP_BEST_SPEED": 1,
}
View Source
var EncryptionAlgo_name = map[int32]string{
	0: "ENCRYPTION_ALGO_UNKNOWN",
	1: "CHACHA20",
}
View Source
var EncryptionAlgo_value = map[string]int32{
	"ENCRYPTION_ALGO_UNKNOWN": 0,
	"CHACHA20":                1,
}

Functions

func ComputeChunks

func ComputeChunks(r io.Reader, cb func([]byte) error) error

ComputeChunks splits a stream of bytes into chunks using a content-defined chunking algorithm. To prevent suboptimal chunk sizes, a minimum and maximum chunk size is enforced. This algorithm is useful for ensuring that typical data modifications (insertions, deletions, updates) only affect a small number of chunks. TODO: Expose configuration.

func Get

func Get(ctx context.Context, client Client, ref *Ref, cb kv.ValueCallback) error

Get calls client.Get to retrieve a chunk, then verifies, decrypts, and decompresses the data. cb is called with the uncompressed plaintext

func NewPostgresKeyStore

func NewPostgresKeyStore(db *pachsql.DB) *postgresKeyStore

func SetupPostgresStoreV0

func SetupPostgresStoreV0(tx *pachsql.Tx) error

SetupPostgresStoreV0 sets up tables in db DO NOT MODIFY THIS FUNCTION IT HAS BEEN USED IN A RELEASED MIGRATION

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher batches entries into chunks. Entries are buffered until they are past the configured threshold, then a chunk is created. Chunk creation is asynchronous with respect to the client, which is why the interface is callback based. Batcher provides one of two callback based interfaces defined by ChunkFunc and EntryFunc. Callbacks will be executed with respect to the order the entries are added (for the ChunkFunc interface, entries are ordered within as well as across calls).

func (*Batcher) Add

func (b *Batcher) Add(meta interface{}, data []byte, pointsTo []*DataRef) error

func (*Batcher) Close

func (b *Batcher) Close() error

type BatcherOption

type BatcherOption func(b *Batcher)

func WithChunkCallback

func WithChunkCallback(cb ChunkFunc) BatcherOption

func WithEntryCallback

func WithEntryCallback(cb EntryFunc) BatcherOption

type ChunkFunc

type ChunkFunc = func([]interface{}, *DataRef) error

ChunkFunc is a function that provides the metadata for the entries in a chunk and a data reference to the chunk.

type Client

type Client interface {
	Create(ctx context.Context, md Metadata, chunkData []byte) (ID, error)
	Get(ctx context.Context, chunkID ID, cb kv.ValueCallback) error
	Close() error
}

Client mediates access to a content-addressed store

func NewClient

func NewClient(store kv.Store, db *pachsql.DB, tr track.Tracker, renewer *Renewer) Client

NewClient returns a client which will write to objc, mdstore, and tracker. Name is used for the set of temporary objects

type CompressionAlgo

type CompressionAlgo int32
const (
	CompressionAlgo_NONE            CompressionAlgo = 0
	CompressionAlgo_GZIP_BEST_SPEED CompressionAlgo = 1
)

func (CompressionAlgo) EnumDescriptor

func (CompressionAlgo) EnumDescriptor() ([]byte, []int)

func (CompressionAlgo) String

func (x CompressionAlgo) String() string

type CreateOptions

type CreateOptions struct {
	Secret      []byte
	Compression CompressionAlgo
}

CreateOptions affect how chunks are created.

type DataReader

type DataReader struct {
	// contains filtered or unexported fields
}

DataReader is an abstraction that lazily reads data referenced by a data reference.

func (*DataReader) Read

func (dr *DataReader) Read(data []byte) (int, error)

type DataRef

type DataRef struct {
	// The chunk the referenced data is located in.
	Ref *Ref `protobuf:"bytes,1,opt,name=ref,proto3" json:"ref,omitempty"`
	// The hash of the data being referenced.
	Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"`
	// The offset and size used for accessing the data within the chunk.
	OffsetBytes          int64    `protobuf:"varint,3,opt,name=offset_bytes,json=offsetBytes,proto3" json:"offset_bytes,omitempty"`
	SizeBytes            int64    `protobuf:"varint,4,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

DataRef is a reference to data within a chunk.

func FullRef

func FullRef(dataRef *DataRef) *DataRef

FullRef creates a data reference for the full chunk referenced by a data reference.

func NewDataRef

func NewDataRef(chunkRef *DataRef, chunkBytes []byte, offset, size int64) *DataRef

func (*DataRef) Descriptor

func (*DataRef) Descriptor() ([]byte, []int)

func (*DataRef) GetHash

func (m *DataRef) GetHash() []byte

func (*DataRef) GetOffsetBytes

func (m *DataRef) GetOffsetBytes() int64

func (*DataRef) GetRef

func (m *DataRef) GetRef() *Ref

func (*DataRef) GetSizeBytes

func (m *DataRef) GetSizeBytes() int64

func (*DataRef) Marshal

func (m *DataRef) Marshal() (dAtA []byte, err error)

func (*DataRef) MarshalLogObject

func (x *DataRef) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*DataRef) MarshalTo

func (m *DataRef) MarshalTo(dAtA []byte) (int, error)

func (*DataRef) MarshalToSizedBuffer

func (m *DataRef) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*DataRef) ProtoMessage

func (*DataRef) ProtoMessage()

func (*DataRef) Reset

func (m *DataRef) Reset()

func (*DataRef) Size

func (m *DataRef) Size() (n int)

func (*DataRef) String

func (m *DataRef) String() string

func (*DataRef) Unmarshal

func (m *DataRef) Unmarshal(dAtA []byte) error

func (*DataRef) XXX_DiscardUnknown

func (m *DataRef) XXX_DiscardUnknown()

func (*DataRef) XXX_Marshal

func (m *DataRef) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*DataRef) XXX_Merge

func (m *DataRef) XXX_Merge(src proto.Message)

func (*DataRef) XXX_Size

func (m *DataRef) XXX_Size() int

func (*DataRef) XXX_Unmarshal

func (m *DataRef) XXX_Unmarshal(b []byte) error

type EncryptionAlgo

type EncryptionAlgo int32
const (
	EncryptionAlgo_ENCRYPTION_ALGO_UNKNOWN EncryptionAlgo = 0
	EncryptionAlgo_CHACHA20                EncryptionAlgo = 1
)

func (EncryptionAlgo) EnumDescriptor

func (EncryptionAlgo) EnumDescriptor() ([]byte, []int)

func (EncryptionAlgo) String

func (x EncryptionAlgo) String() string

type Entry

type Entry struct {
	ChunkID   ID     `db:"chunk_id"`
	Gen       uint64 `db:"gen"`
	Uploaded  bool   `db:"uploaded"`
	Tombstone bool   `db:"tombstone"`
}

Entry is an chunk object mapping

type EntryFunc

type EntryFunc = func(interface{}, *DataRef) error

EntryFunc is a function that provides the metadata for an entry and a data reference to the entry in a chunk. Size zero entries will have a nil data reference.

type GarbageCollector

type GarbageCollector struct {
	// contains filtered or unexported fields
}

GarbageCollector removes unused chunks from object storage

func NewGC

func NewGC(s *Storage, d time.Duration) *GarbageCollector

NewGC returns a new garbage collector operating on s

func (*GarbageCollector) RunForever

func (gc *GarbageCollector) RunForever(ctx context.Context) error

RunForever calls RunOnce until the context is cancelled, logging any errors.

func (*GarbageCollector) RunOnce

func (gc *GarbageCollector) RunOnce(ctx context.Context) (retErr error)

RunOnce runs 1 cycle of garbage collection.

type ID

type ID []byte

ID uniquely identifies a chunk. It is the hash of its content

func Hash

func Hash(data []byte) ID

Hash produces an ID by hashing data

func IDFromHex

func IDFromHex(h string) (ID, error)

IDFromHex parses a hex string into an ID

func ParseTrackerID

func ParseTrackerID(trackerID string) (ID, error)

ParseTrackerID parses a trackerID into a chunk

func (ID) HexString

func (id ID) HexString() string

HexString hex encodes the ID

func (ID) String

func (id ID) String() string

func (ID) TrackerID

func (id ID) TrackerID() string

TrackerID returns an ID for use with the tracker.

type KeyStore

type KeyStore interface {
	Create(ctx context.Context, name string, data []byte) error
	Get(ctx context.Context, name string) ([]byte, error)
}

KeyStore is a store for named secret keys

type Metadata

type Metadata struct {
	Size     int
	PointsTo []ID
}

Metadata holds metadata about a chunk

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads data from chunk storage.

func (*Reader) Get

func (r *Reader) Get(w io.Writer) (retErr error)

Get writes the concatenation of the data referenced by the data references.

type ReaderOption

type ReaderOption func(*Reader)

func WithOffsetBytes

func WithOffsetBytes(offsetBytes int64) ReaderOption

func WithPrefetchLimit

func WithPrefetchLimit(limit int) ReaderOption

type Ref

type Ref struct {
	Id                   []byte          `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	SizeBytes            int64           `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
	Edge                 bool            `protobuf:"varint,3,opt,name=edge,proto3" json:"edge,omitempty"`
	Dek                  []byte          `protobuf:"bytes,4,opt,name=dek,proto3" json:"dek,omitempty"`
	EncryptionAlgo       EncryptionAlgo  `` /* 130-byte string literal not displayed */
	CompressionAlgo      CompressionAlgo `` /* 134-byte string literal not displayed */
	XXX_NoUnkeyedLiteral struct{}        `json:"-"`
	XXX_unrecognized     []byte          `json:"-"`
	XXX_sizecache        int32           `json:"-"`
}

func Create

func Create(ctx context.Context, opts CreateOptions, ptext []byte, createFunc func(ctx context.Context, data []byte) (ID, error)) (*Ref, error)

Create calls createFunc to create a new chunk, but first compresses, and encrypts ptext. ptext will not be modified.

func (*Ref) Descriptor

func (*Ref) Descriptor() ([]byte, []int)

func (*Ref) GetCompressionAlgo

func (m *Ref) GetCompressionAlgo() CompressionAlgo

func (*Ref) GetDek

func (m *Ref) GetDek() []byte

func (*Ref) GetEdge

func (m *Ref) GetEdge() bool

func (*Ref) GetEncryptionAlgo

func (m *Ref) GetEncryptionAlgo() EncryptionAlgo

func (*Ref) GetId

func (m *Ref) GetId() []byte

func (*Ref) GetSizeBytes

func (m *Ref) GetSizeBytes() int64

func (*Ref) Key

func (r *Ref) Key() pachhash.Output

Key returns a unique key for the Ref suitable for use in hash tables

func (*Ref) Marshal

func (m *Ref) Marshal() (dAtA []byte, err error)

func (*Ref) MarshalLogObject

func (x *Ref) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Ref) MarshalTo

func (m *Ref) MarshalTo(dAtA []byte) (int, error)

func (*Ref) MarshalToSizedBuffer

func (m *Ref) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Ref) ProtoMessage

func (*Ref) ProtoMessage()

func (*Ref) Reset

func (m *Ref) Reset()

func (*Ref) Size

func (m *Ref) Size() (n int)

func (*Ref) String

func (m *Ref) String() string

func (*Ref) Unmarshal

func (m *Ref) Unmarshal(dAtA []byte) error

func (*Ref) XXX_DiscardUnknown

func (m *Ref) XXX_DiscardUnknown()

func (*Ref) XXX_Marshal

func (m *Ref) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Ref) XXX_Merge

func (m *Ref) XXX_Merge(src proto.Message)

func (*Ref) XXX_Size

func (m *Ref) XXX_Size() int

func (*Ref) XXX_Unmarshal

func (m *Ref) XXX_Unmarshal(b []byte) error

type Renewer

type Renewer struct {
	// contains filtered or unexported fields
}

func NewRenewer

func NewRenewer(ctx context.Context, tr track.Tracker, name string, ttl time.Duration) *Renewer

func (*Renewer) Add

func (r *Renewer) Add(ctx context.Context, id ID) error

func (*Renewer) Close

func (r *Renewer) Close() error

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Storage is an abstraction for interfacing with chunk storage. A storage instance: - Provides methods for uploading data to chunks and downloading data from chunks. - Manages tracker state to keep chunks alive while uploading. - Manages an internal chunk cache and work deduplicator (parallel downloads of the same chunk will be deduplicated).

func NewStorage

func NewStorage(objC obj.Client, memCache kv.GetPut, db *pachsql.DB, tracker track.Tracker, opts ...StorageOption) *Storage

NewStorage creates a new Storage.

func NewTestStorage

func NewTestStorage(ctx context.Context, t testing.TB, db *pachsql.DB, tr track.Tracker, opts ...StorageOption) (obj.Client, *Storage)

NewTestStorage creates a local storage instance for testing during the lifetime of the callback.

func (*Storage) Check

func (s *Storage) Check(ctx context.Context, begin, end []byte, readChunks bool) (int, error)

Check runs an integrity check on the objects in object storage. It will check objects for chunks with IDs in the range [first, last) As a special case: if len(end) == 0 then it is ignored.

func (*Storage) List

func (s *Storage) List(ctx context.Context, cb func(id ID) error) error

List lists all of the chunks in object storage.

func (*Storage) NewBatcher

func (s *Storage) NewBatcher(ctx context.Context, name string, threshold int, opts ...BatcherOption) *Batcher

TODO: Add config for number of entries.

func (*Storage) NewDataReader

func (s *Storage) NewDataReader(ctx context.Context, dataRef *DataRef) *DataReader

func (*Storage) NewDeleter

func (s *Storage) NewDeleter() track.Deleter

NewDeleter creates a deleter for use with a tracker.GC

func (*Storage) NewReader

func (s *Storage) NewReader(ctx context.Context, dataRefs []*DataRef, opts ...ReaderOption) *Reader

NewReader creates a new Reader.

func (*Storage) NewUploader

func (s *Storage) NewUploader(ctx context.Context, name string, noUpload bool, cb UploadFunc) *Uploader

func (*Storage) PrefetchData

func (s *Storage) PrefetchData(ctx context.Context, dataRef *DataRef) error

type StorageOption

type StorageOption func(s *Storage)

StorageOption configures a storage.

func StorageOptions

func StorageOptions(conf *serviceenv.StorageConfiguration) ([]StorageOption, error)

StorageOptions returns the chunk storage options for the config.

func WithCompression

func WithCompression(algo CompressionAlgo) StorageOption

WithCompression sets the compression algorithm used to compress chunks

func WithMaxConcurrentObjects

func WithMaxConcurrentObjects(maxDownload, maxUpload int) StorageOption

WithMaxConcurrentObjects sets the maximum number of object writers (upload) and readers (download) that can be open at a time.

func WithObjectCache

func WithObjectCache(fastLayer obj.Client, size int) StorageOption

WithObjectCache adds a cache around the currently configured object client

func WithSecret

func WithSecret(secret []byte) StorageOption

WithSecret sets the secret used to generate chunk encryption keys

type UploadFunc

type UploadFunc = func(interface{}, []*DataRef) error

UploadFunc is a function that provides the metadata for a task and the corresponding set of chunk references.

type Uploader

type Uploader struct {
	// contains filtered or unexported fields
}

Uploader uploads chunks. Each upload call creates at least one upload task with the provided metadata. Upload tasks are performed asynchronously, which is why the interface is callback based. Callbacks will be executed with respect to the order the upload tasks are created.

func (*Uploader) Close

func (u *Uploader) Close() error

func (*Uploader) Copy

func (u *Uploader) Copy(meta interface{}, dataRefs []*DataRef) error

Copy performs an upload using a list of data references as the data source. Stable data references will be reused. Unstable data references will have their data downloaded and uploaded similar to a normal upload.

func (*Uploader) CopyByReference

func (u *Uploader) CopyByReference(meta interface{}, dataRefs []*DataRef) error

func (*Uploader) Upload

func (u *Uploader) Upload(meta interface{}, r io.Reader) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL