Version: v0.0.0-...-a0a4b67 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2023 License: Apache-2.0 Imports: 32 Imported by: 2



Package cas implements an efficient client for Content Addressable Storage.



View Source
const (
	// MegaByte is 1_048_576 bytes.
	MegaByte = 1024 * 1024

	// DefaultGRPCConcurrentCallsLimit is set arbitrarily to a power of 2.
	DefaultGRPCConcurrentCallsLimit = 256

	// DefaultGRPCBytesLimit is the same as the default gRPC request size limit.
	// See:
	DefaultGRPCBytesLimit = 4 * MegaByte

	// DefaultGRPCItemsLimit is a 10th of the max.
	DefaultGRPCItemsLimit = 1000

	// MaxGRPCItems is heuristcally (with Google's RBE) set to 10k.
	MaxGRPCItems = 10_000

	// DefaultRPCTimeout is arbitrarily set to what is reasonable for a large action.
	DefaultRPCTimeout = time.Minute

	// DefaultOpenFilesLimit is based on GCS recommendations.
	// See:
	DefaultOpenFilesLimit = 32

	// DefaultOpenLargeFilesLimit is arbitrarily set.
	DefaultOpenLargeFilesLimit = 2

	// DefaultCompressionSizeThreshold is disabled by default.
	DefaultCompressionSizeThreshold = math.MaxInt64

	// BufferSize is based on GCS recommendations.
	// See:
	BufferSize = 4 * MegaByte


View Source
var (
	// ErrNegativeLimit indicates an invalid value that is < 0.
	ErrNegativeLimit = errors.New("cas: limit value must be >= 0")

	// ErrZeroOrNegativeLimit indicates an invalid value that is <= 0.
	ErrZeroOrNegativeLimit = errors.New("cas: limit value must be > 0")
View Source
var (
	// ErrSkip when returned by UploadOptions.Prelude, means the file/dir must be
	// not be uploaded.
	// Note that if UploadOptions.PreserveSymlinks is true and the ErrSkip is
	// returned for a symlink target, but not the symlink itself, then it may
	// result in a dangling symlink.
	ErrSkip = errors.New("skip file")

	// ErrDigestUnknown indicates that the requested digest is unknown.
	// Use errors.Is instead of direct equality check.
	ErrDigestUnknown = errors.New("the requested digest is unknown")


This section is empty.


type Client

type Client struct {

	// InstanceName is the full name of the RBE instance.
	InstanceName string

	// Config is the configuration that the client was created with.
	Config ClientConfig
	// contains filtered or unexported fields

Client is a client for Content Addressable Storage. Create one using NewClient.


All fields are considered immutable, and should not be changed.

func NewClient

func NewClient(ctx context.Context, conn *grpc.ClientConn, instanceName string) (*Client, error)

NewClient creates a new client with the default configuration. Use client.Dial to create a connection.

func NewClientWithConfig

func NewClientWithConfig(ctx context.Context, conn *grpc.ClientConn, instanceName string, config ClientConfig) (*Client, error)

NewClientWithConfig creates a new client and accepts a configuration.

func (*Client) Upload

func (c *Client) Upload(ctx context.Context, opt UploadOptions, inputC <-chan *UploadInput) (*UploadResult, error)

Upload uploads all files/directories specified by inputC.

Upload assumes ownership of UploadInputs received from inputC. They must not be mutated after sending.

Close inputC to indicate that there are no more files/dirs to upload. When inputC is closed, Upload finishes uploading the remaining files/dirs and exits successfully.

If ctx is canceled, the Upload returns with an error.

type ClientConfig

type ClientConfig struct {
	// FSConcurrency is the maximum number of concurrent file system operations.
	// TODO(nodir): ensure this does not hurt streaming performance
	FSConcurrency int

	// FSLargeConcurrency is the maximum number of concurrent large file read operation.
	FSLargeConcurrency int

	// SmallFileThreshold is a size threshold to categorize a file as small.
	// Such files are buffered entirely (read only once).
	SmallFileThreshold int64

	// LargeFileThreshold is a size threshold to categorize a file as large. For
	// such files, IO concurrency limits are much tighter and locality is
	// prioritized: the file is read for the first and second times with minimal
	// delay between the two.
	LargeFileThreshold int64

	// FileIOSize is the size of file reads.
	FileIOSize int64

	// CompressedBytestreamThreshold is the minimum blob size to enable compression
	// in ByteStream RPCs.
	// Use 0 for all writes being compressed, and a negative number for all operations being
	// uncompressed.
	// DefaultClientConfig() disables compression by default.
	CompressedBytestreamThreshold int64

	// FindMissingBlobs is configuration for ContentAddressableStorage.FindMissingBlobs RPCs.
	// FindMissingBlobs.MaxSizeBytes is ignored.
	FindMissingBlobs RPCConfig

	// BatchUpdateBlobs is configuration for ContentAddressableStorage.BatchUpdateBlobs RPCs.
	BatchUpdateBlobs RPCConfig

	// ByteStreamWrite is configuration for ByteStream.Write RPCs.
	// ByteStreamWrite.MaxItems is ignored.
	ByteStreamWrite RPCConfig

	// RetryPolicy specifies how to retry requests on transient errors.
	RetryPolicy retry.BackoffPolicy

	// IgnoreCapabilities specifies whether to ignore server-provided capabilities.
	// Capabilities are consulted by default.
	IgnoreCapabilities bool

ClientConfig is a config for Client. See DefaultClientConfig() for the default values.

func DefaultClientConfig

func DefaultClientConfig() ClientConfig

DefaultClientConfig returns the default config.

To override a specific value:

cfg := DefaultClientConfig()
... mutate cfg ...
client, err := NewClientWithConfig(ctx, cfg)

func (*ClientConfig) Validate

func (c *ClientConfig) Validate() error

Validate returns a non-nil error if the config is invalid.

type DigestStat

type DigestStat struct {
	Digests int64 // number of unique digests
	Bytes   int64 // total sum of digest sizes


DigestStat is aggregated statistics over a set of digests.

type GRPCConfig

type GRPCConfig struct {
	// ConcurrentCallsLimit sets the upper bound of concurrent calls.
	// Must be > 0.
	ConcurrentCallsLimit int

	// BytesLimit sets the upper bound for the size of each request.
	// Comparisons against this value may not be exact due to padding and other serialization naunces.
	// Clients should choose a value that is sufficiently lower than the max size limit for corresponding gRPC connection.
	// Must be > 0.
	// This is defined as int rather than int64 because gRPC uses int for its limit.
	BytesLimit int

	// ItemsLimit sets the upper bound for the number of items per request.
	// Must be > 0.
	ItemsLimit int

	// BundleTimeout sets the maximum duration a call is delayed while bundling.
	// Bundling is used to ammortize the cost of a gRPC call over time. Instead of sending
	// many requests with few items, bunlding attempt to maximize the number of items sent in a single request.
	// This includes waiting for a bit to see if more items are requested.
	BundleTimeout time.Duration

	// Timeout sets the upper bound of the total time spent processing a request.
	// For streaming calls, this applies to each Send/Recv call individually, not the whole streaming session.
	// This does not take into account the time it takes to abort the request upon timeout.
	Timeout time.Duration

	// RetryPolicy sets the retry policy for calls using this config.
	RetryPolicy retry.BackoffPolicy

GRPCConfig specifies the configuration for a gRPC endpoint.

type IOConfig

type IOConfig struct {
	// ConcurrentWalksLimit sets the upper bound of concurrent filesystem tree traversals.
	// This affects the number of concurrent upload requests for the uploader since each one requires a walk.
	// Must be > 0.
	ConcurrentWalksLimit int

	// ConcurrentWalkerVisits sets the upper bound of concurrent visits per walk.
	// Must b > 0.
	ConcurrentWalkerVisits int

	// OpenFilesLimit sets the upper bound for the number of files being simultanuously processed.
	// Must be > 0.
	OpenFilesLimit int

	// OpenLargeFilesLimit sets the upper bound for the number of large files being simultanuously processed.
	// This value counts towards open files. I.e. the following inequality is always effectively true:
	// OpenFilesLimit >= OpenLargeFilesLimit
	// Must be > 0.
	OpenLargeFilesLimit int

	// SmallFileSizeThreshold sets the upper bound (inclusive) for the file size to be considered a small file.
	// Files that are larger than this value (medium and large files) are uploaded via the streaming API.
	// Small files are buffered entirely in memory and uploaded via the batching API.
	// However, it is still possible for a file to be small in size, but still results in a request that is larger than the gRPC size limit.
	// In that case, the file is uploaded via the streaming API instead.
	// The amount of memory used to buffer files is affected by this value and OpenFilesLimit as well as bundling limits for gRPC.
	// The uploader will stop buffering once the OpenFilesLimit is reached, before which the number of buffered files is bound by
	// the number of blobs buffered for uploading (and whatever the GC hasn't freed yet).
	// In the extreme case, the number of buffered bytes for small files (not including streaming buffers) equals
	// the concurrency limit for the upload gRPC call, times the bytes limit per call, times this value.
	// Note that the amount of memory used to buffer bytes of a generated proto messages is not included in this estimate.
	// Must be >= 0.
	SmallFileSizeThreshold int64

	// LargeFileSizeThreshold sets the lower bound (inclusive) for the file size to be considered a large file.
	// Such files are uploaded in chunks using the file streaming API.
	// Must be >= 0.
	LargeFileSizeThreshold int64

	// CompressionSizeThreshold sets the lower bound for the chunk size before it is subject to compression.
	// A value of 0 enables compression for any chunk size. To disable compression, use math.MaxInt64.
	// Must >= 0.
	CompressionSizeThreshold int64

	// BufferSize sets the buffer size for IO read/write operations.
	// Must be > 0.
	BufferSize int

	// OptimizeForDiskLocality enables sorting files by path before they are written to disk to optimize for disk locality.
	// Assuming files under the same directory are located close to each other on disk, then such files are batched together.
	OptimizeForDiskLocality bool

	// Cache is a read/write cache for digested files.
	// The key is the file path and the associated exclusion filter.
	// The value is a a proto message that represents one of repb.SymlinkNode, repb.DirectoryNode, repb.FileNode.
	// Providing a cache here allows for reusing entries between clients.
	// Cache entries are never evicted which implies the assumption that the files are never edited during the lifetime of the cache entry.
	Cache sync.Map

IOConfig specifies the configuration for IO operations.

type RPCConfig

type RPCConfig struct {
	// Concurrency is the maximum number of RPCs in flight.
	Concurrency int

	// MaxSizeBytes is the maximum size of the request/response, in bytes.
	MaxSizeBytes int

	// MaxItems is the maximum number of blobs/digests per RPC.
	// Applies only to batch RPCs, such as FindMissingBlobs.
	MaxItems int

	// Timeout is the maximum duration of the RPC.
	Timeout time.Duration

RPCConfig is configuration for a particular CAS RPC. Some of the fields might not apply to certain RPCs.

For streaming RPCs, the values apply to individual requests/responses in a stream, not the entire stream.

type Stats

type Stats struct {
	// BytesRequested is the total number of bytes in a request.
	// It does not necessarily equal the total number of bytes uploaded/downloaded.
	BytesRequested int64

	// LogicalBytesMoved is the amount of BytesRequested that was processed.
	// It cannot be larger than BytesRequested, but may be smaller in case of a partial response.
	LogicalBytesMoved int64

	// TotalBytesMoved is the total number of bytes moved over the wire.
	// This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one.
	// It may be larger than (retries) or smaller than BytesRequested (compression, cache hits or partial response).
	TotalBytesMoved int64

	// EffectiveBytesMoved is the total number of bytes moved over the wire, excluding retries.
	// This may not be accurate since a gRPC call may be interrupted in which case this number may be higher than the real one.
	// For failures, this is reported as 0.
	// It may be higher than BytesRequested (compression headers), but never higher than BytesMoved.
	EffectiveBytesMoved int64

	// LogicalBytesCached is the total number of bytes not moved over the wire due to caching (either remotely or locally).
	// For failures, this is reported as 0.
	LogicalBytesCached int64

	// LogicalBytesStreamed is the total number of logical bytes moved by the streaming API.
	// It may be larger than (retries) or smaller than (cache hits or partial response) than the requested size.
	// For failures, this is reported as 0.
	LogicalBytesStreamed int64

	// LogicalBytesBatched is the total number of logical bytes moved by the batching API.
	// It may be larger than (retries) or smaller than (cache hits or partial response) the requested size.
	// For failures, this is reported as 0.
	LogicalBytesBatched int64

	// InputFileCount is the number of processed regular files.
	InputFileCount int64

	// InputDirCount is the number of processed directories.
	InputDirCount int64

	// InputSymlinkCount is the number of processed symlinks (not the number of symlinks in the uploaded merkle tree which may be lower).
	InputSymlinkCount int64

	// CacheHitCount is the number of cache hits.
	CacheHitCount int64

	// CacheMissCount is the number of cache misses.
	CacheMissCount int64

	// DigestCount is the number of processed digests.
	DigestCount int64

	// BatchedCount is the number of batched files.
	BatchedCount int64

	// StreamedCount is the number of streamed files.
	// For methods that accept bytes, the value is 1 upon success, 0 otherwise.
	StreamedCount int64

Stats represents potential metrics reported by various methods. Not all fields are populated by every method.

func (*Stats) Add

func (s *Stats) Add(other Stats)

Add mutates the stats by adding all the corresponding fields of the specified instance.

type TransferStats

type TransferStats struct {
	CacheHits   DigestStat
	CacheMisses DigestStat

	Streamed DigestStat // streamed transfers
	Batched  DigestStat // batched transfers

TransferStats is upload/download statistics.

type UploadInput

type UploadInput struct {
	// Path to the file or a directory to upload.
	// Must be absolute.
	Path string

	// Allowlist is a filter for files/directories under Path.
	// If a file is not a present in Allowlist and does not reside in a directory
	// present in the Allowlist, then the file is ignored.
	// This is equivalent to deleting all not-matched files/dirs before
	// uploading.
	// Each path in the Allowlist must be relative to UploadInput.Path.
	// Must be empty if Path points to a regular file.
	Allowlist []string

	// Exclude is a file/dir filter. If Exclude is not nil and the
	// absolute path of a file/dir match this regexp, then the file/dir is skipped.
	// Forward-slash-separated paths are matched against the regexp: PathExclude
	// does not have to be conditional on the OS.
	// If the Path is a directory, then the filter is evaluated against each file
	// in the subtree.
	// See ErrSkip comments for more details on semantics regarding excluding symlinks .
	Exclude *regexp.Regexp
	// contains filtered or unexported fields

UploadInput specifies a file or directory to upload.

func (*UploadInput) Digest

func (in *UploadInput) Digest(relPath string) (digest.Digest, error)

Digest returns the digest computed for a file/dir. The relPath is relative to UploadInput.Path. Use "." for the digest of the UploadInput.Path itself.

Digest is safe to call only after the channel returned by DigestsComputed() is closed.

If the digest is unknown, returns (nil, err), where err is ErrDigestUnknown according to errors.Is. If the file is a danging symlink, then its digest is unknown.

func (*UploadInput) DigestsComputed

func (in *UploadInput) DigestsComputed() <-chan struct{}

DigestsComputed returns a channel which is closed when all digests, including descendants, are computed. It is guaranteed to be closed by the time Client.Upload() returns successfully.

DigestsComputed() is always safe to call.

type UploadOptions

type UploadOptions struct {
	// PreserveSymlinks specifies whether to preserve symlinks or convert them
	// to regular files. This doesn't upload target of symlinks, caller needs
	// to specify targets explicitly if those are necessary too.
	PreserveSymlinks bool

	// AllowDanglingSymlinks specifies whether to upload dangling links or halt
	// the upload with an error.
	// This field is ignored if PreserveSymlinks is false, which is the default.
	AllowDanglingSymlinks bool

	// Prelude is called for each file/dir to be read and uploaded.
	// If it returns an error which is ErrSkip according to errors.Is, then the
	// file/dir is not processed.
	// If it returns another error, then the upload is halted with that error.
	// Prelude might be called multiple times for the same file if different
	// UploadInputs directly/indirectly refer to the same file, but with different
	// UploadInput.Exclude.
	// Prelude is called from different goroutines.
	Prelude func(absPath string, mode os.FileMode) error

UploadOptions is optional configuration for Upload function. The default options are the zero value of this struct.

type UploadResult

type UploadResult struct {
	Stats TransferStats
	// contains filtered or unexported fields

UploadResult is the result of a Client.Upload call. It provides file/dir digests and statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL