Documentation ¶
Index ¶
- Constants
- Variables
- func CreateContext(ctx context.Context) createOpt
- func CreatePriHigh(o *createOptions)
- func CreatePriLow(o *createOptions)
- func CreatePriMedium(o *createOptions)
- func IsRetriableError(err error) bool
- func OpenContext(ctx context.Context) openOpt
- func OpenPriHigh(o *openOptions)
- func OpenPriLow(o *openOptions)
- func OpenPriMedium(o *openOptions)
- func ReplFactor(n int) createOpt
- func StorageCold(o *createOptions)
- func StorageDefault(o *createOptions)
- func StorageHot(o *createOptions)
- func StorageWarm(o *createOptions)
- func WithExpires(e time.Time) createOpt
- type Blob
- func (b *Blob) ByteLength() (int64, error)
- func (b *Blob) ID() BlobID
- func (b *Blob) Read(p []byte) (int, error)
- func (b *Blob) ReadAt(p []byte, offset int64) (int, error)
- func (b *Blob) Seek(offset int64, whence int) (int64, error)
- func (b *Blob) Stat() (core.BlobInfo, error)
- func (b *Blob) Write(p []byte) (int, error)
- func (b *Blob) WriteAt(p []byte, offset int64) (int, error)
- type BlobID
- type BlobIterator
- type Client
- func (cli *Client) ClusterID() string
- func (cli *Client) Create(opts ...createOpt) (*Blob, error)
- func (cli *Client) Delete(ctx context.Context, id BlobID) error
- func (cli *Client) EnableCache(cacheEnabled bool)
- func (cli *Client) GetTracts(ctx context.Context, id BlobID, start, end int) ([]core.TractInfo, error)
- func (cli *Client) ListBlobs(ctx context.Context) BlobIterator
- func (cli *Client) Open(id BlobID, mode string, opts ...openOpt) (*Blob, error)
- func (cli *Client) SetMetadata(ctx context.Context, id BlobID, metadata core.BlobInfo) error
- func (cli *Client) Undelete(ctx context.Context, id BlobID) error
- type CuratorTalker
- type MasterConnection
- type Options
- type RPCCuratorTalker
- func (r *RPCCuratorTalker) AckExtendBlob(ctx context.Context, addr string, blob core.BlobID, tracts []core.TractInfo) core.Error
- func (r *RPCCuratorTalker) CreateBlob(ctx context.Context, addr string, metadata core.BlobInfo) (core.BlobID, core.Error)
- func (r *RPCCuratorTalker) DeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error
- func (r *RPCCuratorTalker) ExtendBlob(ctx context.Context, addr string, blob core.BlobID, numTracts int) ([]core.TractInfo, core.Error)
- func (r *RPCCuratorTalker) FixVersion(ctx context.Context, addr string, tract core.TractInfo, bad string) core.Error
- func (r *RPCCuratorTalker) GetTracts(ctx context.Context, addr string, blob core.BlobID, start, end int, ...) ([]core.TractInfo, core.Error)
- func (r *RPCCuratorTalker) ListBlobs(ctx context.Context, addr string, partition core.PartitionID, ...) ([]core.BlobKey, core.Error)
- func (r *RPCCuratorTalker) ReportBadTS(ctx context.Context, addr string, id core.TractID, bad, op string, ...) core.Error
- func (r *RPCCuratorTalker) SetMetadata(ctx context.Context, addr string, blob core.BlobID, md core.BlobInfo) core.Error
- func (r *RPCCuratorTalker) StatBlob(ctx context.Context, addr string, blob core.BlobID) (core.BlobInfo, core.Error)
- func (r *RPCCuratorTalker) UndeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error
- type RPCMasterConnection
- func (r *RPCMasterConnection) GetTractserverInfo(ctx context.Context) (info []core.TractserverInfo, err core.Error)
- func (r *RPCMasterConnection) ListPartitions(ctx context.Context) (partitions []core.PartitionID, err core.Error)
- func (r *RPCMasterConnection) LookupPartition(ctx context.Context, partition core.PartitionID) (curator string, err core.Error)
- func (r *RPCMasterConnection) MasterCreateBlob(ctx context.Context) (curator string, err core.Error)
- type RPCTractserverTalker
- func (r *RPCTractserverTalker) Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, ...) core.Error
- func (r *RPCTractserverTalker) GetDiskInfo(ctx context.Context, addr string) ([]core.FsStatus, core.Error)
- func (r *RPCTractserverTalker) Read(ctx context.Context, addr string, id core.TractID, version int, len int, ...) ([]byte, core.Error)
- func (r *RPCTractserverTalker) ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, ...) (int, core.Error)
- func (r *RPCTractserverTalker) SetControlFlags(ctx context.Context, addr string, root string, flags core.DiskControlFlags) core.Error
- func (r *RPCTractserverTalker) StatTract(ctx context.Context, addr string, id core.TractID, version int) (int64, core.Error)
- func (r *RPCTractserverTalker) Write(ctx context.Context, addr string, id core.TractID, version int, b []byte, ...) core.Error
- type ReadaheadBlob
- type ReconstructBehavior
- type TractserverTalker
Constants ¶
const ( // ParallelRPCs is the number of read/write rpcs to issue in parallel per // call to blob.Read or Write. (Note that this is not a global limit. Each // call may issue up to this many RPCs in parallel.) ParallelRPCs = 12 // LookupCacheSize is the number of partitions for which we cache the // current curator. We don't expect that many partitions so we might as well // cache them all. LookupCacheSize = 100 // TractCacheSize is the number of _blobs_ for which we cache tract // locations. (Large blobs may use more space in the cache, there's no // weight tracking.) TractCacheSize = 100 // TractLength defines the size of a full tract. TractLength = core.TractLength )
Variables ¶
var ErrInvalidBlobID = errors.New("invalid blob ID format")
ErrInvalidBlobID is the string representation that the blob ID is invalid.
Functions ¶
func CreateContext ¶
CreateContext associates a context with this Create call.
func CreatePriHigh ¶
func CreatePriHigh(o *createOptions)
CreatePriHigh gives high priority to all disk operations related to this blob.
func CreatePriLow ¶
func CreatePriLow(o *createOptions)
CreatePriLow gives low priority to all disk operations related to this blob.
func CreatePriMedium ¶
func CreatePriMedium(o *createOptions)
CreatePriMedium gives medium priority to all disk operations related to this blob.
func IsRetriableError ¶
IsRetriableError returns if the error is a retriable error. It assumes the error is not nil.
func OpenContext ¶
OpenContext associates a context with this Open call.
func OpenPriHigh ¶
func OpenPriHigh(o *openOptions)
OpenPriHigh gives high priority to all disk operations related to this blob.
func OpenPriLow ¶
func OpenPriLow(o *openOptions)
OpenPriLow gives low priority to all disk operations related to this blob.
func OpenPriMedium ¶
func OpenPriMedium(o *openOptions)
OpenPriMedium gives medium priority to all disk operations related to this blob.
func ReplFactor ¶
func ReplFactor(n int) createOpt
ReplFactor causes the blob to be created with replication factor n.
func StorageCold ¶
func StorageCold(o *createOptions)
StorageCold causes the blob to be created with an intention for cold storage.
func StorageDefault ¶
func StorageDefault(o *createOptions)
StorageDefault causes the blob to be created with the default storage hint.
func StorageHot ¶
func StorageHot(o *createOptions)
StorageHot causes the blob to be created with an intention for hot storage.
func StorageWarm ¶
func StorageWarm(o *createOptions)
StorageWarm causes the blob to be created with an intention for warm storage.
func WithExpires ¶
WithExpires causes the blob to be created with an expiration time.
Types ¶
type Blob ¶
type Blob struct {
// contains filtered or unexported fields
}
Blob is an implementation of the 'io.ReadWriteSeeker' interface and is built on top of the client driver. It provides an object handle for the stored blobs. This is not thread-safe. PL-1157: (1) Should we make it thread-safe or leave the user to handle it? (2) Optimize the implementation to reduce number of rpcs to the curators.
func (*Blob) ByteLength ¶
ByteLength returns the length of the blob in bytes.
func (*Blob) Read ¶
Read reads up to 'len(p)' bytes from 'b' into 'p'. It returns the number of bytes read and any error encountered.
func (*Blob) ReadAt ¶
ReadAt reads up to 'len(p)' bytes from the 'offset' of 'b' into 'p'. It returns the number of bytes read and any error encountered. Different from Read, this method doesn't change the internal offset of 'b' for the next Read or Write.
func (*Blob) Seek ¶
Seek sets the offset for the next Read or Write to 'offset', interpreted according to 'whence': 0 means relative to the origin of the blob, 1 means relative to the current offset, and 2 means relative to the end. Seeking with respect to the end is supported but in no means optimized. Seek returns the new offset and an error, if any.
Seeking to a negative offset is an error. Seeking to any positive offset is legal, but subsequent I/O operations on the underlying object can return errors if the offset is out of bound.
func (*Blob) Write ¶
Write writes 'len(p)' bytes to 'b'. It returns the number of bytes written from 'p' and any error encountered that caused the write to stop early.
func (*Blob) WriteAt ¶
WriteAt writes 'len(p)' bytes to 'b' starting at the 'offset'. It returns the number of bytes written from 'p' and any error encountered that caused the write to stop early. Different from Write, this method doesn't change the internal offset of 'b' for the next Write or Read.
type BlobID ¶
BlobID refers to a Blob stored in Blb.
const NilBlobID BlobID = 0
NilBlobID means the ID is invalid.
func ParseBlobID ¶
ParseBlobID parses a BlobID from the provided string. The string must be in the format produced by BlobID.String(). If it is not, ErrInvalidID will be returned.
type BlobIterator ¶
BlobIterator is a function that returns BlobIDs in batches.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client exposes a simple interface to Blb users for requesting services and handles communications with master, curator and tractservers under the hood.
func NewClient ¶
NewClient returns a new Client that can be used to interact with a Blob cell. You must pass an Option object that contains the configuration of the client.
func NewMockClient ¶
func NewMockClient() *Client
NewMockClient returns a new in-memory mock client that can only be used for testing.
func (*Client) Create ¶
Create creates a blob with the given options. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors.
func (*Client) Delete ¶
Delete deletes 'blob'. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors.
func (*Client) EnableCache ¶
EnableCache turns on/off caching.
func (*Client) GetTracts ¶
func (cli *Client) GetTracts(ctx context.Context, id BlobID, start, end int) ([]core.TractInfo, error)
GetTracts returns a subset of the tracts for the blob. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors.
func (*Client) ListBlobs ¶
func (cli *Client) ListBlobs(ctx context.Context) BlobIterator
ListBlobs returns an iterator that lists all existing blobs, in batches. Clients should keep calling the iterator until it return nil, or an error. The iterator should be used in a single-threaded manner.
ListBlobs is not guaranteed to return all blobs if cluster membership or raft leadership changes during iteration. It's intended for informative and diagnostic use only.
func (*Client) Open ¶
Open opens a blob referenced by 'id'. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors. 'mode' should be a combination of "r" for reading, "w" for writing, and "s" for statting.
func (*Client) SetMetadata ¶
SetMetadata allows changing various fields of blob metadata. Currently changing the storage hint, mtime, atime are supported.
type CuratorTalker ¶
type CuratorTalker interface { // CreateBlob creates a blob. Only Repl, Hint, and Expires in options are used. CreateBlob(ctx context.Context, addr string, metadata core.BlobInfo) (core.BlobID, core.Error) // ExtendBlob extends 'blob' until it has 'numTracts' tracts, and returns // the new tracts. ExtendBlob(ctx context.Context, addr string, blob core.BlobID, numTracts int) ([]core.TractInfo, core.Error) // AckExtendBlob acks the success of extending 'blob' with the new // tracts 'tracts'. AckExtendBlob(ctx context.Context, addr string, blob core.BlobID, tracts []core.TractInfo) core.Error // DeleteBlob deletes 'blob'. DeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error // Undelete tries to un-delete 'blob'. UndeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error // SetMetadata changes some metadata for the given blob. SetMetadata(ctx context.Context, addr string, blob core.BlobID, md core.BlobInfo) core.Error // GetTracts retrieves the tracts ['start', 'end') for 'blob'. GetTracts(ctx context.Context, addr string, blob core.BlobID, start, end int, forRead, forWrite bool) ([]core.TractInfo, core.Error) // StatBlob gets information about a blob. StatBlob(ctx context.Context, addr string, blob core.BlobID) (core.BlobInfo, core.Error) // ReportBadTS asks the curator to re-replicate or recover the provided tract or chunk as // the host 'bad' is unavailable. The curator may ignore this, or it may re-replicate. ReportBadTS(ctx context.Context, addr string, id core.TractID, bad, op string, got core.Error, couldRecover bool) core.Error // FixVersion asks the curator at 'addr' to check the version(s) of the provided // tract. Sent when a client attemped an I/O to the tractserver at 'bad' with the // tract in 'tract' but had a version mismatch error. FixVersion(ctx context.Context, addr string, tract core.TractInfo, bad string) core.Error // ListBlobs gets a contiguous range of blob ids in a given partition, whose // keys (lower part of id) are >= 'start'. The server may choose how many to // return at once, but it will be at least one if any exist. The keys will // be a contiguous range of the id space, and will be in order. An empty // return value means that no blobs in that part of the id space exist. ListBlobs(ctx context.Context, addr string, partition core.PartitionID, start core.BlobKey) ([]core.BlobKey, core.Error) }
CuratorTalker manages connections to curators.
func NewRPCCuratorTalker ¶
func NewRPCCuratorTalker() CuratorTalker
NewRPCCuratorTalker creates a new RPCCuratorTalker.
type MasterConnection ¶
type MasterConnection interface { // MasterCreatelob returns a proper curator to create a new blob. MasterCreateBlob(ctx context.Context) (curator string, err core.Error) // LookupPartition returns which curator is responsible for 'partition'. LookupPartition(ctx context.Context, part core.PartitionID) (curator string, err core.Error) // ListPartitions returns all existing partitions. ListPartitions(ctx context.Context) (partitions []core.PartitionID, err core.Error) // GetTractserverInfo gets tractserver state. GetTractserverInfo(ctx context.Context) (info []core.TractserverInfo, err core.Error) }
MasterConnection is a connection to master.
func NewRPCMasterConnection ¶
func NewRPCMasterConnection(addrSpec string) MasterConnection
NewRPCMasterConnection creates a new RPCMasterConnection to the master set with addresses in 'addrSpec'.
type Options ¶
type Options struct { // The cluster this client will connect to. Most users can use a simple // cluster name here. This may also be a cluster/user/service string, to // fully specify a service discovery record, or a comma-separated list of // master hostnames. If empty, we will try to connect to the default cluster // (but this is not recommended; try to use an explicit cluster name if // possible). Cluster string // Whether client will retry operations failed due to "retriable" erorr. DisableRetry bool // RetryTimeout bounds the total time of retries if it's greater than zero. RetryTimeout time.Duration // Whether client will use cache. DisableCache bool // An optional label to diffrentiate metrics from differen client instances. // It will be "default" if it's not specified. Instance string // How the client decides whether to attempt client-side RS reconstruction. ReconstructBehavior ReconstructBehavior }
Options contains configurations of a client.
type RPCCuratorTalker ¶
type RPCCuratorTalker struct {
// contains filtered or unexported fields
}
RPCCuratorTalker implmenets CuratorTalker based on Go's RPC package.
func (*RPCCuratorTalker) AckExtendBlob ¶
func (r *RPCCuratorTalker) AckExtendBlob(ctx context.Context, addr string, blob core.BlobID, tracts []core.TractInfo) core.Error
AckExtendBlob acks the success of extending 'blob' with the new tracts 'tracts' and term number 'term'.
func (*RPCCuratorTalker) CreateBlob ¶
func (r *RPCCuratorTalker) CreateBlob(ctx context.Context, addr string, metadata core.BlobInfo) (core.BlobID, core.Error)
CreateBlob implements CuratorTalker.
func (*RPCCuratorTalker) DeleteBlob ¶
func (r *RPCCuratorTalker) DeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error
DeleteBlob implements CuratorTalker.
func (*RPCCuratorTalker) ExtendBlob ¶
func (r *RPCCuratorTalker) ExtendBlob(ctx context.Context, addr string, blob core.BlobID, numTracts int) ([]core.TractInfo, core.Error)
ExtendBlob implements CuratorTalker.
func (*RPCCuratorTalker) FixVersion ¶
func (r *RPCCuratorTalker) FixVersion(ctx context.Context, addr string, tract core.TractInfo, bad string) core.Error
FixVersion asks the curator to verify the version(s) of the provided tract as we were unable to use the information in 'tract' to do IO with 'bad'.
func (*RPCCuratorTalker) GetTracts ¶
func (r *RPCCuratorTalker) GetTracts(ctx context.Context, addr string, blob core.BlobID, start, end int, forRead, forWrite bool) ([]core.TractInfo, core.Error)
GetTracts implements CuratorTalker.
func (*RPCCuratorTalker) ListBlobs ¶
func (r *RPCCuratorTalker) ListBlobs(ctx context.Context, addr string, partition core.PartitionID, start core.BlobKey) ([]core.BlobKey, core.Error)
ListBlobs gets a contiguous range of blob ids in a given partition.
func (*RPCCuratorTalker) ReportBadTS ¶
func (r *RPCCuratorTalker) ReportBadTS(ctx context.Context, addr string, id core.TractID, bad, op string, got core.Error, couldRecover bool) core.Error
ReportBadTS tells the curator to re-replicate 'id' replacing 'bad'.
func (*RPCCuratorTalker) SetMetadata ¶
func (r *RPCCuratorTalker) SetMetadata(ctx context.Context, addr string, blob core.BlobID, md core.BlobInfo) core.Error
SetMetadata implements CuratorTalker.
func (*RPCCuratorTalker) StatBlob ¶
func (r *RPCCuratorTalker) StatBlob(ctx context.Context, addr string, blob core.BlobID) (core.BlobInfo, core.Error)
StatBlob implements CuratorTalker.
func (*RPCCuratorTalker) UndeleteBlob ¶
func (r *RPCCuratorTalker) UndeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error
UndeleteBlob implements CuratorTalker.
type RPCMasterConnection ¶
type RPCMasterConnection struct {
// contains filtered or unexported fields
}
RPCMasterConnection implements MasterConnection based on Go's RPC pacakge.
func (*RPCMasterConnection) GetTractserverInfo ¶
func (r *RPCMasterConnection) GetTractserverInfo(ctx context.Context) (info []core.TractserverInfo, err core.Error)
GetTractserverInfo gets tractserver state.
func (*RPCMasterConnection) ListPartitions ¶
func (r *RPCMasterConnection) ListPartitions(ctx context.Context) (partitions []core.PartitionID, err core.Error)
ListPartitions returns all existing partitions.
func (*RPCMasterConnection) LookupPartition ¶
func (r *RPCMasterConnection) LookupPartition(ctx context.Context, partition core.PartitionID) (curator string, err core.Error)
LookupPartition calls master 'Lookup' to locate the curator for 'partition'.
func (*RPCMasterConnection) MasterCreateBlob ¶
func (r *RPCMasterConnection) MasterCreateBlob(ctx context.Context) (curator string, err core.Error)
MasterCreateBlob calls master 'Create' to choose a curator to create a blob.
type RPCTractserverTalker ¶
type RPCTractserverTalker struct {
// contains filtered or unexported fields
}
RPCTractserverTalker implements TractserverTalker based on Go RPC.
func (*RPCTractserverTalker) Create ¶
func (r *RPCTractserverTalker) Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error
Create creates a new tract on the tractserver and does a write to the newly created tract.
func (*RPCTractserverTalker) GetDiskInfo ¶
func (r *RPCTractserverTalker) GetDiskInfo(ctx context.Context, addr string) ([]core.FsStatus, core.Error)
GetDiskInfo returns a summary of disk info.
func (*RPCTractserverTalker) Read ¶
func (r *RPCTractserverTalker) Read(ctx context.Context, addr string, id core.TractID, version int, len int, off int64) ([]byte, core.Error)
Read reads from a tract.
func (*RPCTractserverTalker) ReadInto ¶
func (r *RPCTractserverTalker) ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) (int, core.Error)
ReadInto reads from a tract into a provided slice without copying.
func (*RPCTractserverTalker) SetControlFlags ¶
func (r *RPCTractserverTalker) SetControlFlags(ctx context.Context, addr string, root string, flags core.DiskControlFlags) core.Error
SetControlFlags changes control flags for a disk.
type ReadaheadBlob ¶
type ReadaheadBlob struct {
// contains filtered or unexported fields
}
ReadaheadBlob wraps an existing Blob for read and tries to prefetch the data for better performance. It implements io.ReadSeeker interface.
func NewReadaheadBlob ¶
func NewReadaheadBlob(b *Blob) *ReadaheadBlob
NewReadaheadBlob wraps an existing blob and does readahead on top of that.
func (*ReadaheadBlob) ByteLength ¶
func (b *ReadaheadBlob) ByteLength() (int64, error)
ByteLength returns the length of the blob in bytes.
func (*ReadaheadBlob) Read ¶
func (b *ReadaheadBlob) Read(p []byte) (int, error)
Read does the same thing as Blob.Read does.
func (*ReadaheadBlob) Seek ¶
func (b *ReadaheadBlob) Seek(offset int64, whence int) (int64, error)
Seek does the same thing as Blob.Seek does. When a seek is performed all the prefetched data will be discarded. Since internally ReadaheadBlob keeps a buffer with the size of one tract so presumably it will issue just one read for each tract. But if you seek to the middle of a tract then the buffer will not be aligned with a tract anymore, thus it might end up issuing more than one read for a tract.
TODO: we might want to optimize this by duplicating the bufio logic here.
type ReconstructBehavior ¶
ReconstructBehavior lets clients control when the client attempts client-side RS reconstruction (instead of just retrying and waiting for the curator to do it).
type TractserverTalker ¶
type TractserverTalker interface { // Create creates a new tract on the tractserver and does a write to the // newly created tract. Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error // Write does a write to a tract on this tractserver. Write(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) core.Error // Read reads from a tract. Read(ctx context.Context, addr string, id core.TractID, version int, len int, off int64) ([]byte, core.Error) // ReadInto reads from a tract. It will try to read up to len(b) bytes and put them in b. // It returns the number of bytes read, as in io.Reader's Read. ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) (int, core.Error) // StatTract returns the number of bytes in a tract. StatTract(ctx context.Context, addr string, id core.TractID, version int) (int64, core.Error) // GetDiskInfo returns a summary of disk info. GetDiskInfo(ctx context.Context, addr string) ([]core.FsStatus, core.Error) // SetControlFlags changes control flags for a disk. SetControlFlags(ctx context.Context, addr string, root string, flags core.DiskControlFlags) core.Error }
TractserverTalker manages connections to tractservers.
func NewRPCTractserverTalker ¶
func NewRPCTractserverTalker() TractserverTalker
NewRPCTractserverTalker returns a new Golang RPC based implementation of TractserverTalker.