Documentation
¶
Overview ¶
Package cas implements an efficient client for Content Addressable Storage.
Index ¶
Constants ¶
const ( // MegaByte is 1_048_576 bytes. MegaByte = 1024 * 1024 // DefaultGRPCConcurrentCallsLimit is set arbitrarily to a power of 2. DefaultGRPCConcurrentCallsLimit = 256 // DefaultGRPCBytesLimit is the same as the default gRPC request size limit. // See: https://pkg.go.dev/google.golang.org/grpc#MaxCallRecvMsgSize DefaultGRPCBytesLimit = 4 * MegaByte // DefaultGRPCItemsLimit is a 10th of the max. DefaultGRPCItemsLimit = 1000 // MaxGRPCItems is heuristcally (with Google's RBE) set to 10k. MaxGRPCItems = 10_000 // DefaultRPCTimeout is arbitrarily set to what is reasonable for a large action. DefaultRPCTimeout = time.Minute // DefaultOpenFilesLimit is based on GCS recommendations. // See: https://cloud.google.com/compute/docs/disks/optimizing-pd-performance#io-queue-depth DefaultOpenFilesLimit = 32 // DefaultOpenLargeFilesLimit is arbitrarily set. DefaultOpenLargeFilesLimit = 2 // DefaultCompressionSizeThreshold is disabled by default. DefaultCompressionSizeThreshold = math.MaxInt64 // BufferSize is based on GCS recommendations. // See: https://cloud.google.com/compute/docs/disks/optimizing-pd-performance#io-size BufferSize = 4 * MegaByte )
Variables ¶
var ( // ErrNegativeLimit indicates an invalid value that is < 0. ErrNegativeLimit = errors.New("cas: limit value must be >= 0") // ErrZeroOrNegativeLimit indicates an invalid value that is <= 0. ErrZeroOrNegativeLimit = errors.New("cas: limit value must be > 0") )
var ( // ErrSkip when returned by UploadOptions.Prelude, means the file/dir must be // not be uploaded. // // Note that if UploadOptions.PreserveSymlinks is true and the ErrSkip is // returned for a symlink target, but not the symlink itself, then it may // result in a dangling symlink. ErrSkip = errors.New("skip file") // ErrDigestUnknown indicates that the requested digest is unknown. // Use errors.Is instead of direct equality check. ErrDigestUnknown = errors.New("the requested digest is unknown") )
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { // InstanceName is the full name of the RBE instance. InstanceName string // Config is the configuration that the client was created with. Config ClientConfig // contains filtered or unexported fields }
Client is a client for Content Addressable Storage. Create one using NewClient.
Goroutine-safe.
All fields are considered immutable, and should not be changed.
func NewClient ¶
NewClient creates a new client with the default configuration. Use client.Dial to create a connection.
func NewClientWithConfig ¶
func NewClientWithConfig(ctx context.Context, conn *grpc.ClientConn, instanceName string, config ClientConfig) (*Client, error)
NewClientWithConfig creates a new client and accepts a configuration.
func (*Client) Upload ¶
func (c *Client) Upload(ctx context.Context, opt UploadOptions, inputC <-chan *UploadInput) (*UploadResult, error)
Upload uploads all files/directories specified by inputC.
Upload assumes ownership of UploadInputs received from inputC. They must not be mutated after sending.
Close inputC to indicate that there are no more files/dirs to upload. When inputC is closed, Upload finishes uploading the remaining files/dirs and exits successfully.
If ctx is canceled, the Upload returns with an error.
type ClientConfig ¶
type ClientConfig struct { // FSConcurrency is the maximum number of concurrent file system operations. // TODO(nodir): ensure this does not hurt streaming performance FSConcurrency int // FSLargeConcurrency is the maximum number of concurrent large file read operation. FSLargeConcurrency int // SmallFileThreshold is a size threshold to categorize a file as small. // Such files are buffered entirely (read only once). SmallFileThreshold int64 // LargeFileThreshold is a size threshold to categorize a file as large. For // such files, IO concurrency limits are much tighter and locality is // prioritized: the file is read for the first and second times with minimal // delay between the two. LargeFileThreshold int64 // FileIOSize is the size of file reads. FileIOSize int64 // CompressedBytestreamThreshold is the minimum blob size to enable compression // in ByteStream RPCs. // Use 0 for all writes being compressed, and a negative number for all operations being // uncompressed. // DefaultClientConfig() disables compression by default. CompressedBytestreamThreshold int64 // FindMissingBlobs is configuration for ContentAddressableStorage.FindMissingBlobs RPCs. // FindMissingBlobs.MaxSizeBytes is ignored. FindMissingBlobs RPCConfig // BatchUpdateBlobs is configuration for ContentAddressableStorage.BatchUpdateBlobs RPCs. BatchUpdateBlobs RPCConfig // ByteStreamWrite is configuration for ByteStream.Write RPCs. // ByteStreamWrite.MaxItems is ignored. ByteStreamWrite RPCConfig // RetryPolicy specifies how to retry requests on transient errors. RetryPolicy retry.BackoffPolicy // IgnoreCapabilities specifies whether to ignore server-provided capabilities. // Capabilities are consulted by default. IgnoreCapabilities bool }
ClientConfig is a config for Client. See DefaultClientConfig() for the default values.
func DefaultClientConfig ¶
func DefaultClientConfig() ClientConfig
DefaultClientConfig returns the default config.
To override a specific value:
cfg := DefaultClientConfig() ... mutate cfg ... client, err := NewClientWithConfig(ctx, cfg)
func (*ClientConfig) Validate ¶
func (c *ClientConfig) Validate() error
Validate returns a non-nil error if the config is invalid.
type DigestStat ¶
type DigestStat struct { Digests int64 // number of unique digests Bytes int64 // total sum of digest sizes }
DigestStat is aggregated statistics over a set of digests.
type GRPCConfig ¶
type GRPCConfig struct { // ConcurrentCallsLimit sets the upper bound of concurrent calls. // Must be > 0. ConcurrentCallsLimit int // BytesLimit sets the upper bound for the size of each request. // Comparisons against this value may not be exact due to padding and other serialization naunces. // Clients should choose a value that is sufficiently lower than the max size limit for corresponding gRPC connection. // Must be > 0. // This is defined as int rather than int64 because gRPC uses int for its limit. BytesLimit int // ItemsLimit sets the upper bound for the number of items per request. // Must be > 0. ItemsLimit int // BundleTimeout sets the maximum duration a call is delayed while bundling. // Bundling is used to ammortize the cost of a gRPC call over time. Instead of sending // many requests with few items, bunlding attempt to maximize the number of items sent in a single request. // This includes waiting for a bit to see if more items are requested. BundleTimeout time.Duration // Timeout sets the upper bound of the total time spent processing a request. // For streaming calls, this applies to each Send/Recv call individually, not the whole streaming session. // This does not take into account the time it takes to abort the request upon timeout. Timeout time.Duration // RetryPolicy sets the retry policy for calls using this config. RetryPolicy retry.BackoffPolicy }
GRPCConfig specifies the configuration for a gRPC endpoint.
type IOConfig ¶
type IOConfig struct { // ConcurrentWalksLimit sets the upper bound of concurrent filesystem tree traversals. // This affects the number of concurrent upload requests for the uploader since each one requires a walk. // Must be > 0. ConcurrentWalksLimit int // ConcurrentWalkerVisits sets the upper bound of concurrent visits per walk. // Must b > 0. ConcurrentWalkerVisits int // OpenFilesLimit sets the upper bound for the number of files being simultanuously processed. // Must be > 0. OpenFilesLimit int // OpenLargeFilesLimit sets the upper bound for the number of large files being simultanuously processed. // // This value counts towards open files. I.e. the following inequality is always effectively true: // OpenFilesLimit >= OpenLargeFilesLimit // Must be > 0. OpenLargeFilesLimit int // SmallFileSizeThreshold sets the upper bound (inclusive) for the file size to be considered a small file. // // Files that are larger than this value (medium and large files) are uploaded via the streaming API. // // Small files are buffered entirely in memory and uploaded via the batching API. // However, it is still possible for a file to be small in size, but still results in a request that is larger than the gRPC size limit. // In that case, the file is uploaded via the streaming API instead. // // The amount of memory used to buffer files is affected by this value and OpenFilesLimit as well as bundling limits for gRPC. // The uploader will stop buffering once the OpenFilesLimit is reached, before which the number of buffered files is bound by // the number of blobs buffered for uploading (and whatever the GC hasn't freed yet). // In the extreme case, the number of buffered bytes for small files (not including streaming buffers) equals // the concurrency limit for the upload gRPC call, times the bytes limit per call, times this value. // Note that the amount of memory used to buffer bytes of a generated proto messages is not included in this estimate. // // Must be >= 0. SmallFileSizeThreshold int64 // LargeFileSizeThreshold sets the lower bound (inclusive) for the file size to be considered a large file. // Such files are uploaded in chunks using the file streaming API. // Must be >= 0. LargeFileSizeThreshold int64 // CompressionSizeThreshold sets the lower bound for the chunk size before it is subject to compression. // A value of 0 enables compression for any chunk size. To disable compression, use math.MaxInt64. // Must >= 0. CompressionSizeThreshold int64 // BufferSize sets the buffer size for IO read/write operations. // Must be > 0. BufferSize int // OptimizeForDiskLocality enables sorting files by path before they are written to disk to optimize for disk locality. // Assuming files under the same directory are located close to each other on disk, then such files are batched together. OptimizeForDiskLocality bool // Cache is a read/write cache for digested files. // The key is the file path and the associated exclusion filter. // The value is a a proto message that represents one of repb.SymlinkNode, repb.DirectoryNode, repb.FileNode. // Providing a cache here allows for reusing entries between clients. // Cache entries are never evicted which implies the assumption that the files are never edited during the lifetime of the cache entry. Cache sync.Map }
IOConfig specifies the configuration for IO operations.
type RPCConfig ¶
type RPCConfig struct { // Concurrency is the maximum number of RPCs in flight. Concurrency int // MaxSizeBytes is the maximum size of the request/response, in bytes. MaxSizeBytes int // MaxItems is the maximum number of blobs/digests per RPC. // Applies only to batch RPCs, such as FindMissingBlobs. MaxItems int // Timeout is the maximum duration of the RPC. Timeout time.Duration }
RPCConfig is configuration for a particular CAS RPC. Some of the fields might not apply to certain RPCs.
For streaming RPCs, the values apply to individual requests/responses in a stream, not the entire stream.
type Stats ¶
type Stats struct { // BytesRequested is the total number of bytes in a request. // It does not necessarily equal the total number of bytes uploaded/downloaded. BytesRequested int64 // LogicalBytesMoved is the amount of BytesRequested that was processed. // It cannot be larger than BytesRequested, but may be smaller in case of a partial response. LogicalBytesMoved int64 // TotalBytesMoved is the total number of bytes moved over the wire. // This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one. // It may be larger than (retries) or smaller than BytesRequested (compression, cache hits or partial response). TotalBytesMoved int64 // EffectiveBytesMoved is the total number of bytes moved over the wire, excluding retries. // This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one. // For failures, this is reported as 0. // It may be higher than BytesRequested (compression headers), but never higher than BytesMoved. EffectiveBytesMoved int64 // LogicalBytesCached is the total number of bytes not moved over the wire due to caching (either remotely or locally). // For failures, this is reported as 0. LogicalBytesCached int64 // LogicalBytesStreamed is the total number of logical bytes moved by the streaming API. // It may be larger than (retries) or smaller than (cache hits or partial response) than the requested size. // For failures, this is reported as 0. LogicalBytesStreamed int64 // LogicalBytesBatched is the total number of logical bytes moved by the batching API. // It may be larger than (retries) or smaller than (cache hits or partial response) the requested size. // For failures, this is reported as 0. LogicalBytesBatched int64 // InputFileCount is the number of processed regular files. InputFileCount int64 // InputDirCount is the number of processed directories. InputDirCount int64 // InputSymlinkCount is the number of processed symlinks (not the number of symlinks in the uploaded merkle tree which may be lower). InputSymlinkCount int64 // CacheHitCount is the number of cache hits. CacheHitCount int64 // CacheMissCount is the number of cache misses. CacheMissCount int64 // DigestCount is the number of processed digests. DigestCount int64 // BatchedCount is the number of batched files. BatchedCount int64 // StreamedCount is the number of streamed files. // For methods that accept bytes, the value is 1 upon success, 0 otherwise. StreamedCount int64 }
Stats represents potential metrics reported by various methods. Not all fields are populated by every method.
type TransferStats ¶
type TransferStats struct { CacheHits DigestStat CacheMisses DigestStat Streamed DigestStat // streamed transfers Batched DigestStat // batched transfers }
TransferStats is upload/download statistics.
type UploadInput ¶
type UploadInput struct { // Path to the file or a directory to upload. // Must be absolute. Path string // Allowlist is a filter for files/directories under Path. // If a file is not a present in Allowlist and does not reside in a directory // present in the Allowlist, then the file is ignored. // This is equivalent to deleting all not-matched files/dirs before // uploading. // // Each path in the Allowlist must be relative to UploadInput.Path. // // Must be empty if Path points to a regular file. Allowlist []string // Exclude is a file/dir filter. If Exclude is not nil and the // absolute path of a file/dir match this regexp, then the file/dir is skipped. // Forward-slash-separated paths are matched against the regexp: PathExclude // does not have to be conditional on the OS. // If the Path is a directory, then the filter is evaluated against each file // in the subtree. // See ErrSkip comments for more details on semantics regarding excluding symlinks . Exclude *regexp.Regexp // contains filtered or unexported fields }
UploadInput specifies a file or directory to upload.
func (*UploadInput) Digest ¶
func (in *UploadInput) Digest(relPath string) (digest.Digest, error)
Digest returns the digest computed for a file/dir. The relPath is relative to UploadInput.Path. Use "." for the digest of the UploadInput.Path itself.
Digest is safe to call only after the channel returned by DigestsComputed() is closed.
If the digest is unknown, returns (nil, err), where err is ErrDigestUnknown according to errors.Is. If the file is a danging symlink, then its digest is unknown.
func (*UploadInput) DigestsComputed ¶
func (in *UploadInput) DigestsComputed() <-chan struct{}
DigestsComputed returns a channel which is closed when all digests, including descendants, are computed. It is guaranteed to be closed by the time Client.Upload() returns successfully.
DigestsComputed() is always safe to call.
type UploadOptions ¶
type UploadOptions struct { // PreserveSymlinks specifies whether to preserve symlinks or convert them // to regular files. This doesn't upload target of symlinks, caller needs // to specify targets explicitly if those are necessary too. PreserveSymlinks bool // AllowDanglingSymlinks specifies whether to upload dangling links or halt // the upload with an error. // // This field is ignored if PreserveSymlinks is false, which is the default. AllowDanglingSymlinks bool // Prelude is called for each file/dir to be read and uploaded. // If it returns an error which is ErrSkip according to errors.Is, then the // file/dir is not processed. // If it returns another error, then the upload is halted with that error. // // Prelude might be called multiple times for the same file if different // UploadInputs directly/indirectly refer to the same file, but with different // UploadInput.Exclude. // // Prelude is called from different goroutines. Prelude func(absPath string, mode os.FileMode) error }
UploadOptions is optional configuration for Upload function. The default options are the zero value of this struct.
type UploadResult ¶
type UploadResult struct { Stats TransferStats // contains filtered or unexported fields }
UploadResult is the result of a Client.Upload call. It provides file/dir digests and statistics.