remotestorage

package
v0.40.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2022 License: Apache-2.0 Imports: 35 Imported by: 1

Documentation

Index

Constants

View Source
const HedgeDownloadSizeLimit = 4 * 1024 * 1024

Only hedge downloads of ranges < 4MB in length for now.

View Source
const MaxFetchSize = 128 * 1024 * 1024

Variables

View Source
var ErrInvalidDoltSpecPath = errors.New("invalid dolt spec path")
View Source
var ErrRemoteTableFileGet = errors.New("HTTP GET for remote table file failed")
View Source
var ErrUploadFailed = errors.New("upload failed")
View Source
var HttpError = errors.New("http")
View Source
var MaxHedgesPerRequest = 1
View Source
var StatsFlusher func(StatsRecorder) = func(StatsRecorder) {}

Functions

func EventsUnaryClientInterceptor

func EventsUnaryClientInterceptor(collector *events.Collector) grpc.UnaryClientInterceptor

func GetHost

func GetHost(err error) string

func GetJsonEncodedRequest

func GetJsonEncodedRequest(err error) (string, error)

func GetRequest

func GetRequest(err error) interface{}

func GetRpc

func GetRpc(err error) string

func GetStatus

func GetStatus(err error) *status.Status

func HashSetToSlices

func HashSetToSlices(hashes hash.HashSet) ([]hash.Hash, [][]byte)

HashSetToSlices takes a HashSet and converts it to a slice of hashes, and a slice of byte slices

func HashesToSlices

func HashesToSlices(hashes []hash.Hash) [][]byte

HashesToSlices takes a list of hashes and converts each hash to a byte slice returning a slice of byte slices

func HttpPostUpload

func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, post *remotesapi.HttpPostTableFile, contentHash []byte, contentLength int64, body io.ReadCloser) error

func IsChunkStoreRpcErr

func IsChunkStoreRpcErr(err error) bool

func ParseByteSlices

func ParseByteSlices(byteSlices [][]byte) (hash.HashSet, map[hash.Hash]int)

ParseByteSlices takes a slice of byte slices and converts it to a HashSet, and a map from hash to it's index in the original slice

func RetryingUnaryClientInterceptor

func RetryingUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error

func StatsFlusherToColorError

func StatsFlusherToColorError(r StatsRecorder)

Types

type CacheStats

type CacheStats interface {
	CacheHits() uint32
}

type ChunkCache

type ChunkCache interface {
	// Put puts a slice of chunks into the cache.
	Put(c []nbs.CompressedChunk) bool

	// Get gets a map of hash to chunk for a set of hashes.  In the event that a chunk is not in the cache, chunks.Empty.
	// is put in it's place
	Get(h hash.HashSet) map[hash.Hash]nbs.CompressedChunk

	// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
	Has(h hash.HashSet) (absent hash.HashSet)

	// PutChunk puts a single chunk in the cache.  true returns in the event that the chunk was cached successfully
	// and false is returned if that chunk is already is the cache.
	PutChunk(chunk nbs.CompressedChunk) bool

	// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
	// between the last time GetAndClearChunksToFlush was called and now.
	GetAndClearChunksToFlush() map[hash.Hash]nbs.CompressedChunk
}

ChunkCache is an interface used for caching chunks

type ConcurrencyParams

type ConcurrencyParams struct {
	ConcurrentSmallFetches int
	ConcurrentLargeFetches int
	LargeFetchSize         int
}

type DoltChunkStore

type DoltChunkStore struct {
	// contains filtered or unexported fields
}

func NewDoltChunkStore

func NewDoltChunkStore(ctx context.Context, nbf *types.NomsBinFormat, org, repoName, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error)

func NewDoltChunkStoreFromPath

func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error)

func (*DoltChunkStore) AddTableFilesToManifest

func (dcs *DoltChunkStore) AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error

AddTableFilesToManifest adds table files to the manifest

func (*DoltChunkStore) Close

func (dcs *DoltChunkStore) Close() error

Close tears down any resources in use by the implementation. After Close(), the ChunkStore may not be used again. It is NOT SAFE to call Close() concurrently with any other ChunkStore method; behavior is undefined and probably crashy.

func (*DoltChunkStore) Commit

func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error)

Commit atomically attempts to persist all novel Chunks and update the persisted root hash from last to current (or keeps it the same). If last doesn't match the root in persistent storage, returns false.

func (*DoltChunkStore) Get

func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error)

Get the Chunk for the value of the hash in the store. If the hash is absent from the store EmptyChunk is returned.

func (*DoltChunkStore) GetMany

func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, found func(context.Context, *chunks.Chunk)) error

func (*DoltChunkStore) GetManyCompressed

func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, found func(context.Context, nbs.CompressedChunk)) error

GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks which have been found. Any non-present chunks will silently be ignored.

func (*DoltChunkStore) Has

func (dcs *DoltChunkStore) Has(ctx context.Context, h hash.Hash) (bool, error)

Returns true iff the value at the address |h| is contained in the store

func (*DoltChunkStore) HasMany

func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error)

Returns a new HashSet containing any members of |hashes| that are absent from the store.

func (*DoltChunkStore) PruneTableFiles

func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error

PruneTableFiles deletes old table files that are no longer referenced in the manifest.

func (*DoltChunkStore) Put

func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk) error

Put caches c. Upon return, c must be visible to subsequent Get and Has calls, but must not be persistent until a call to Flush(). Put may be called concurrently with other calls to Put(), Get(), GetMany(), Has() and HasMany().

func (*DoltChunkStore) Rebase

func (dcs *DoltChunkStore) Rebase(ctx context.Context) error

Rebase brings this ChunkStore into sync with the persistent storage's current root.

func (*DoltChunkStore) Root

func (dcs *DoltChunkStore) Root(ctx context.Context) (hash.Hash, error)

Root returns the root of the database as of the time the ChunkStore was opened or the most recent call to Rebase.

func (*DoltChunkStore) SetLogger

func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger)

func (*DoltChunkStore) SetRootChunk

func (dcs *DoltChunkStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error

SetRootChunk changes the root chunk hash from the previous value to the new root.

func (*DoltChunkStore) Size

func (dcs *DoltChunkStore) Size(ctx context.Context) (uint64, error)

func (*DoltChunkStore) Sources

func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, []nbs.TableFile, error)

Sources retrieves the current root hash, a list of all the table files (which may include appendix table files) and a list of only appendix table files

func (*DoltChunkStore) Stats

func (dcs *DoltChunkStore) Stats() interface{}

Stats may return some kind of struct that reports statistics about the ChunkStore instance. The type is implementation-dependent, and impls may return nil

func (*DoltChunkStore) StatsSummary

func (dcs *DoltChunkStore) StatsSummary() string

StatsSummary may return a string containing summarized statistics for this ChunkStore. It must return "Unsupported" if this operation is not supported.

func (*DoltChunkStore) SupportedOperations

func (dcs *DoltChunkStore) SupportedOperations() nbs.TableFileStoreOps

func (*DoltChunkStore) Version

func (dcs *DoltChunkStore) Version() string

Returns the NomsVersion with which this ChunkSource is compatible.

func (*DoltChunkStore) WithChunkCache

func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore

func (*DoltChunkStore) WithDownloadConcurrency

func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams) *DoltChunkStore

func (*DoltChunkStore) WithHTTPFetcher

func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore

func (*DoltChunkStore) WithNoopChunkCache

func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore

func (*DoltChunkStore) WriteTableFile

func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error

WriteTableFile reads a table file from the provided reader and writes it to the chunk store.

type DoltRemoteTableFile

type DoltRemoteTableFile struct {
	// contains filtered or unexported fields
}

DoltRemoteTableFile is an implementation of a TableFile that lives in a DoltChunkStore

func (DoltRemoteTableFile) FileID

func (drtf DoltRemoteTableFile) FileID() string

FileID gets the id of the file

func (DoltRemoteTableFile) NumChunks

func (drtf DoltRemoteTableFile) NumChunks() int

NumChunks returns the number of chunks in a table file

func (DoltRemoteTableFile) Open

Open returns an io.ReadCloser which can be used to read the bytes of a table file.

type DurationEstimator

type DurationEstimator interface {
	// Duration returns the expected |time.Duration| of a Work with |Size| |sz|.
	Duration(sz uint64) time.Duration
}

DurationEstimator returns an estimated duration given the size of a Work

type DurationObserver

type DurationObserver interface {
	// Observe is called by |Hedger| when work is completed. |sz| is the |Size|
	// of the work. |n| specifies which |Run| called by the Hedger completed,
	// with 1 being the first |Run|. |d| is the duration the |Run| function
	// took for the |Run| that completed. |err| is any |error| returned from
	// |Run|.
	Observe(sz uint64, n int, d time.Duration, err error)
}

DurationObserver observes Work completions

type DynamicEstimator

type DynamicEstimator interface {
	DurationEstimator
	DurationObserver
}

DynamicEstimator returns an estimated |Duration| for Work that is dynamically updated by observations from |Observe|.

type EstimateStrategy

type EstimateStrategy struct {
	// contains filtered or unexported fields
}

EstimateStrategy wraps a DurationEstimator. It wants to hedge Work |Run|'s when it takes longer than the Work estimate.

func NewEstimateStrategy

func NewEstimateStrategy(e DurationEstimator) *EstimateStrategy

NewEstimateStrategy returns a new EstimateStrategy

func (*EstimateStrategy) NextTry

func (s *EstimateStrategy) NextTry(sz uint64, elapsed time.Duration, n int) time.Duration

NextTry implements HedgeStrategy

type ExponentialHedgeStrategy

type ExponentialHedgeStrategy struct {
	// contains filtered or unexported fields
}

ExponentialHedgeStrategy increases the |underlying|'s nextTry by a factor of two for every hedge attempt, including the first attempt.

func NewExponentialHedgeStrategy

func NewExponentialHedgeStrategy(u HedgeStrategy) *ExponentialHedgeStrategy

NewExponentialHedgeStrategy returns a new ExponentialHedgeStrategy

func (*ExponentialHedgeStrategy) NextTry

func (e *ExponentialHedgeStrategy) NextTry(sz uint64, elapsed time.Duration, n int) time.Duration

NextTry implements HedgeStrategy

type FixedHedgeStrategy

type FixedHedgeStrategy struct {
	FixedNextTry time.Duration
}

FixedHedgeStrategy always returns |FixedNextTry| from |NextTry|

func NewFixedHedgeStrategy

func NewFixedHedgeStrategy(fixedNextTry time.Duration) *FixedHedgeStrategy

NewFixedHedgeStrategy returns a new FixedHedgeStrategy

func (*FixedHedgeStrategy) NextTry

func (s *FixedHedgeStrategy) NextTry(sz uint64, elapsed time.Duration, n int) time.Duration

NextTry implements HedgeStrategy

type GetRange

type GetRange remotesapi.HttpGetRange

func (*GetRange) Append

func (gr *GetRange) Append(other *GetRange)

func (*GetRange) ChunkByteRange

func (gr *GetRange) ChunkByteRange(i int) (uint64, uint64)

func (*GetRange) ChunkEndOffset

func (gr *GetRange) ChunkEndOffset(i int) uint64

func (*GetRange) ChunkStartOffset

func (gr *GetRange) ChunkStartOffset(i int) uint64

func (*GetRange) GapBetween

func (gr *GetRange) GapBetween(i, j int) uint64

func (*GetRange) GetDownloadFunc

func (gr *GetRange) GetDownloadFunc(ctx context.Context, stats StatsRecorder, fetcher HTTPFetcher, chunkChan chan nbs.CompressedChunk, pathToUrl resourcePathToUrlFunc) func() error

func (*GetRange) NumBytesInRanges

func (gr *GetRange) NumBytesInRanges() uint64

func (*GetRange) NumChunks

func (gr *GetRange) NumChunks() int

func (*GetRange) RangeLen

func (gr *GetRange) RangeLen() uint64

func (*GetRange) ResourcePath

func (gr *GetRange) ResourcePath() string

func (*GetRange) Sort

func (gr *GetRange) Sort()

func (*GetRange) SplitAtGaps

func (gr *GetRange) SplitAtGaps(maxGapBytes uint64) []*GetRange

type HTTPFetcher

type HTTPFetcher interface {
	Do(req *http.Request) (*http.Response, error)
}

type HedgeStrategy

type HedgeStrategy interface {
	// NextTry determines how long to wait before hedging a Work's |Run|
	// function. |sz| is the |Size| of the Work, |elapsed| is the time since the
	// first |Run| was called, and |n| is the hedge number starting from 1.
	NextTry(sz uint64, elapsed time.Duration, n int) time.Duration
}

HedgeStrategy is used by Hedger to decide when to launch concurrent |Run| calls.

type Hedger

type Hedger struct {
	// contains filtered or unexported fields
}

Hedger can |Do| Work, potentially invoking Work |Run|'s more than once concurrently if it is taking longer than |strat|'s |NextTry| duration. Completed Work gets reported to the DurationObserver.

var DownloadHedger *Hedger

func NewHedger

func NewHedger(maxOutstanding int64, strat HedgeStrategy, observer DurationObserver) *Hedger

NewHedger returns a new Hedger. |maxOutstanding| is the most hedged |Run|'s that can be outstanding. If a |Run| would be hedged, but there are already maxOutstanding hedged |Run|'s, nothing happens instead. |strat| is the HedgeStrategy to use for this hedger. |observer| is a DurationObserver that will receive |Observe|'s when a Work completes.

func (*Hedger) Do

func (h *Hedger) Do(ctx context.Context, w Work) (interface{}, error)

Do runs |w| to completion, potentially spawning concurrent |Run|'s. Returns the results from the first invocation that completes, and cancels the contexts of all invocations.

type HistogramStatsRecorder

type HistogramStatsRecorder struct {
	// contains filtered or unexported fields
}

func NewHistorgramStatsRecorder

func NewHistorgramStatsRecorder() *HistogramStatsRecorder

func (*HistogramStatsRecorder) RecordDownloadAttemptStart

func (r *HistogramStatsRecorder) RecordDownloadAttemptStart(hedge, retry int, offset, size uint64)

func (*HistogramStatsRecorder) RecordDownloadComplete

func (r *HistogramStatsRecorder) RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration)

func (*HistogramStatsRecorder) RecordTimeToFirstByte

func (r *HistogramStatsRecorder) RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration)

func (*HistogramStatsRecorder) WriteSummaryTo

func (r *HistogramStatsRecorder) WriteSummaryTo(w io.Writer) error

type MinHedgeStrategy

type MinHedgeStrategy struct {
	// contains filtered or unexported fields
}

MinHedgeStrategy bounds an underlying strategy's NextTry duration to be above |min|.

func NewMinHedgeStrategy

func NewMinHedgeStrategy(min time.Duration, underlying HedgeStrategy) *MinHedgeStrategy

NewMinHedgeStrategy returns a new MinHedgeStrategy

func (*MinHedgeStrategy) NextTry

func (s *MinHedgeStrategy) NextTry(sz uint64, elapsed time.Duration, n int) time.Duration

NextTry implements HedgeStrategy

type NoopObserver

type NoopObserver struct {
}

NoopObserver is a DurationObserver has a noop |Observe|

func NewNoopObserver

func NewNoopObserver() *NoopObserver

NewNoopObserver returns a new NoopObserver

func (*NoopObserver) Observe

func (*NoopObserver) Observe(sz uint64, n int, d time.Duration, err error)

Observe implements DurationObserver

type NullStatsRecorder

type NullStatsRecorder struct {
}

func (NullStatsRecorder) RecordDownloadAttemptStart

func (NullStatsRecorder) RecordDownloadAttemptStart(hedge, retry int, offset, size uint64)

func (NullStatsRecorder) RecordDownloadComplete

func (NullStatsRecorder) RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration)

func (NullStatsRecorder) RecordTimeToFirstByte

func (NullStatsRecorder) RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration)

func (NullStatsRecorder) WriteSummaryTo

func (NullStatsRecorder) WriteSummaryTo(io.Writer) error

type PercentileEstimator

type PercentileEstimator struct {
	Percentile float64
	// contains filtered or unexported fields
}

PercentileEstimator is an DynamicEstimator which puts all |Observe| durations into a histogram and returns the current value of the provided |Percentile| in that histogram for the estimated |Duration|. |sz| is ignored.

func NewPercentileEstimator

func NewPercentileEstimator(low, high time.Duration, perc float64) *PercentileEstimator

NewPercentileEstimator returns an initialized |PercentileEstimator|.

func (*PercentileEstimator) Duration

func (ps *PercentileEstimator) Duration(sz uint64) time.Duration

Duration implements DurationEstimator.

func (*PercentileEstimator) Observe

func (ps *PercentileEstimator) Observe(sz uint64, n int, d time.Duration, err error)

Observe implements DurationObserver.

type RpcError

type RpcError struct {
	// contains filtered or unexported fields
}

func NewRpcError

func NewRpcError(err error, rpc, host string, req interface{}) *RpcError

func (*RpcError) Error

func (rpce *RpcError) Error() string

func (*RpcError) FullDetails

func (rpce *RpcError) FullDetails() string

type Sizer

type Sizer interface {
	Size() int64
}

type StatsRecorder

type StatsRecorder interface {
	RecordTimeToFirstByte(hedge, retry int, size uint64, d time.Duration)
	RecordDownloadAttemptStart(hedge, retry int, offset, size uint64)
	RecordDownloadComplete(hedge, retry int, size uint64, d time.Duration)
	WriteSummaryTo(io.Writer) error
}

func HistogramStatsRecorderFactory

func HistogramStatsRecorderFactory() StatsRecorder

func NullStatsRecorderFactory

func NullStatsRecorderFactory() StatsRecorder

type Work

type Work struct {
	// Run is the function that will be called by |Hedger.Do|. It will be
	// called at least once, and possibly multiple times depending on how
	// long it takes and the |Hedger|'s |Strategy|.
	Run func(ctx context.Context, n int) (interface{}, error)

	// Size is an integer representation of the size of the work.
	// Potentially used by |Strategy|, not used by |Hedger|.
	Size uint64
}

Work is a description of work that can be hedged. The supplied Run function should expect to potentially be called multiple times concurrently, and it should respect |ctx| cancellation. |Size| will be used to estimate Run's duration. This estimate is also used to determine when a Work's Run should be hedged with concurrent Run call(s).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL