blb

package
v0.0.0-...-fd5963e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2019 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// ParallelRPCs is the number of read/write rpcs to issue in parallel per
	// call to blob.Read or Write. (Note that this is not a global limit. Each
	// call may issue up to this many RPCs in parallel.)
	ParallelRPCs = 12

	// LookupCacheSize is the number of partitions for which we cache the
	// current curator. We don't expect that many partitions so we might as well
	// cache them all.
	LookupCacheSize = 100

	// TractCacheSize is the number of _blobs_ for which we cache tract
	// locations. (Large blobs may use more space in the cache, there's no
	// weight tracking.)
	TractCacheSize = 100

	// TractLength defines the size of a full tract.
	TractLength = core.TractLength
)

Variables

View Source
var ErrInvalidBlobID = errors.New("invalid blob ID format")

ErrInvalidBlobID is the string representation that the blob ID is invalid.

Functions

func CreateContext

func CreateContext(ctx context.Context) createOpt

CreateContext associates a context with this Create call.

func CreatePriHigh

func CreatePriHigh(o *createOptions)

CreatePriHigh gives high priority to all disk operations related to this blob.

func CreatePriLow

func CreatePriLow(o *createOptions)

CreatePriLow gives low priority to all disk operations related to this blob.

func CreatePriMedium

func CreatePriMedium(o *createOptions)

CreatePriMedium gives medium priority to all disk operations related to this blob.

func IsRetriableError

func IsRetriableError(err error) bool

IsRetriableError returns if the error is a retriable error. It assumes the error is not nil.

func OpenContext

func OpenContext(ctx context.Context) openOpt

OpenContext associates a context with this Open call.

func OpenPriHigh

func OpenPriHigh(o *openOptions)

OpenPriHigh gives high priority to all disk operations related to this blob.

func OpenPriLow

func OpenPriLow(o *openOptions)

OpenPriLow gives low priority to all disk operations related to this blob.

func OpenPriMedium

func OpenPriMedium(o *openOptions)

OpenPriMedium gives medium priority to all disk operations related to this blob.

func ReplFactor

func ReplFactor(n int) createOpt

ReplFactor causes the blob to be created with replication factor n.

func StorageCold

func StorageCold(o *createOptions)

StorageCold causes the blob to be created with an intention for cold storage.

func StorageDefault

func StorageDefault(o *createOptions)

StorageDefault causes the blob to be created with the default storage hint.

func StorageHot

func StorageHot(o *createOptions)

StorageHot causes the blob to be created with an intention for hot storage.

func StorageWarm

func StorageWarm(o *createOptions)

StorageWarm causes the blob to be created with an intention for warm storage.

func WithExpires

func WithExpires(e time.Time) createOpt

WithExpires causes the blob to be created with an expiration time.

Types

type Blob

type Blob struct {
	// contains filtered or unexported fields
}

Blob is an implementation of the 'io.ReadWriteSeeker' interface and is built on top of the client driver. It provides an object handle for the stored blobs. This is not thread-safe. PL-1157: (1) Should we make it thread-safe or leave the user to handle it? (2) Optimize the implementation to reduce number of rpcs to the curators.

func (*Blob) ByteLength

func (b *Blob) ByteLength() (int64, error)

ByteLength returns the length of the blob in bytes.

func (*Blob) ID

func (b *Blob) ID() BlobID

ID returns the BlobID of 'b'.

func (*Blob) Read

func (b *Blob) Read(p []byte) (int, error)

Read reads up to 'len(p)' bytes from 'b' into 'p'. It returns the number of bytes read and any error encountered.

func (*Blob) ReadAt

func (b *Blob) ReadAt(p []byte, offset int64) (int, error)

ReadAt reads up to 'len(p)' bytes from the 'offset' of 'b' into 'p'. It returns the number of bytes read and any error encountered. Different from Read, this method doesn't change the internal offset of 'b' for the next Read or Write.

func (*Blob) Seek

func (b *Blob) Seek(offset int64, whence int) (int64, error)

Seek sets the offset for the next Read or Write to 'offset', interpreted according to 'whence': 0 means relative to the origin of the blob, 1 means relative to the current offset, and 2 means relative to the end. Seeking with respect to the end is supported but in no means optimized. Seek returns the new offset and an error, if any.

Seeking to a negative offset is an error. Seeking to any positive offset is legal, but subsequent I/O operations on the underlying object can return errors if the offset is out of bound.

func (*Blob) Stat

func (b *Blob) Stat() (core.BlobInfo, error)

Stat returns returns stat(2)-ish info about the blob.

func (*Blob) Write

func (b *Blob) Write(p []byte) (int, error)

Write writes 'len(p)' bytes to 'b'. It returns the number of bytes written from 'p' and any error encountered that caused the write to stop early.

func (*Blob) WriteAt

func (b *Blob) WriteAt(p []byte, offset int64) (int, error)

WriteAt writes 'len(p)' bytes to 'b' starting at the 'offset'. It returns the number of bytes written from 'p' and any error encountered that caused the write to stop early. Different from Write, this method doesn't change the internal offset of 'b' for the next Write or Read.

type BlobID

type BlobID core.BlobID

BlobID refers to a Blob stored in Blb.

const NilBlobID BlobID = 0

NilBlobID means the ID is invalid.

func ParseBlobID

func ParseBlobID(s string) (BlobID, error)

ParseBlobID parses a BlobID from the provided string. The string must be in the format produced by BlobID.String(). If it is not, ErrInvalidID will be returned.

func (BlobID) String

func (b BlobID) String() string

String returns a human-readable string representation of the BlobID.

type BlobIterator

type BlobIterator func() ([]BlobID, error)

BlobIterator is a function that returns BlobIDs in batches.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client exposes a simple interface to Blb users for requesting services and handles communications with master, curator and tractservers under the hood.

func NewClient

func NewClient(options Options) *Client

NewClient returns a new Client that can be used to interact with a Blob cell. You must pass an Option object that contains the configuration of the client.

func NewMockClient

func NewMockClient() *Client

NewMockClient returns a new in-memory mock client that can only be used for testing.

func (*Client) ClusterID

func (cli *Client) ClusterID() string

ClusterID returns the cluster this client is connecting to

func (*Client) Create

func (cli *Client) Create(opts ...createOpt) (*Blob, error)

Create creates a blob with the given options. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors.

func (*Client) Delete

func (cli *Client) Delete(ctx context.Context, id BlobID) error

Delete deletes 'blob'. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors.

func (*Client) EnableCache

func (cli *Client) EnableCache(cacheEnabled bool)

EnableCache turns on/off caching.

func (*Client) GetTracts

func (cli *Client) GetTracts(ctx context.Context, id BlobID, start, end int) ([]core.TractInfo, error)

GetTracts returns a subset of the tracts for the blob. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors.

func (*Client) ListBlobs

func (cli *Client) ListBlobs(ctx context.Context) BlobIterator

ListBlobs returns an iterator that lists all existing blobs, in batches. Clients should keep calling the iterator until it return nil, or an error. The iterator should be used in a single-threaded manner.

ListBlobs is not guaranteed to return all blobs if cluster membership or raft leadership changes during iteration. It's intended for informative and diagnostic use only.

func (*Client) Open

func (cli *Client) Open(id BlobID, mode string, opts ...openOpt) (*Blob, error)

Open opens a blob referenced by 'id'. It will retry the operation internally according to the retry policy specified by users if the operation failed due to "retriable" errors. 'mode' should be a combination of "r" for reading, "w" for writing, and "s" for statting.

func (*Client) SetMetadata

func (cli *Client) SetMetadata(ctx context.Context, id BlobID, metadata core.BlobInfo) error

SetMetadata allows changing various fields of blob metadata. Currently changing the storage hint, mtime, atime are supported.

func (*Client) Undelete

func (cli *Client) Undelete(ctx context.Context, id BlobID) error

Undelete undeletes 'blob'.

type CuratorTalker

type CuratorTalker interface {
	// CreateBlob creates a blob. Only Repl, Hint, and Expires in options are used.
	CreateBlob(ctx context.Context, addr string, metadata core.BlobInfo) (core.BlobID, core.Error)

	// ExtendBlob extends 'blob' until it has 'numTracts' tracts, and returns
	// the new tracts.
	ExtendBlob(ctx context.Context, addr string, blob core.BlobID, numTracts int) ([]core.TractInfo, core.Error)

	// AckExtendBlob acks the success of extending 'blob' with the new
	// tracts 'tracts'.
	AckExtendBlob(ctx context.Context, addr string, blob core.BlobID, tracts []core.TractInfo) core.Error

	// DeleteBlob deletes 'blob'.
	DeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error

	// Undelete tries to un-delete 'blob'.
	UndeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error

	// SetMetadata changes some metadata for the given blob.
	SetMetadata(ctx context.Context, addr string, blob core.BlobID, md core.BlobInfo) core.Error

	// GetTracts retrieves the tracts ['start', 'end') for 'blob'.
	GetTracts(ctx context.Context, addr string, blob core.BlobID, start, end int, forRead, forWrite bool) ([]core.TractInfo, core.Error)

	// StatBlob gets information about a blob.
	StatBlob(ctx context.Context, addr string, blob core.BlobID) (core.BlobInfo, core.Error)

	// ReportBadTS asks the curator to re-replicate or recover the provided tract or chunk as
	// the host 'bad' is unavailable.  The curator may ignore this, or it may re-replicate.
	ReportBadTS(ctx context.Context, addr string, id core.TractID, bad, op string, got core.Error, couldRecover bool) core.Error

	// FixVersion asks the curator at 'addr' to check the version(s) of the provided
	// tract.  Sent when a client attemped an I/O to the tractserver at 'bad' with the
	// tract in 'tract' but had a version mismatch error.
	FixVersion(ctx context.Context, addr string, tract core.TractInfo, bad string) core.Error

	// ListBlobs gets a contiguous range of blob ids in a given partition, whose
	// keys (lower part of id) are >= 'start'. The server may choose how many to
	// return at once, but it will be at least one if any exist. The keys will
	// be a contiguous range of the id space, and will be in order. An empty
	// return value means that no blobs in that part of the id space exist.
	ListBlobs(ctx context.Context, addr string, partition core.PartitionID, start core.BlobKey) ([]core.BlobKey, core.Error)
}

CuratorTalker manages connections to curators.

func NewRPCCuratorTalker

func NewRPCCuratorTalker() CuratorTalker

NewRPCCuratorTalker creates a new RPCCuratorTalker.

type MasterConnection

type MasterConnection interface {
	// MasterCreatelob returns a proper curator to create a new blob.
	MasterCreateBlob(ctx context.Context) (curator string, err core.Error)

	// LookupPartition returns which curator is responsible for 'partition'.
	LookupPartition(ctx context.Context, part core.PartitionID) (curator string, err core.Error)

	// ListPartitions returns all existing partitions.
	ListPartitions(ctx context.Context) (partitions []core.PartitionID, err core.Error)

	// GetTractserverInfo gets tractserver state.
	GetTractserverInfo(ctx context.Context) (info []core.TractserverInfo, err core.Error)
}

MasterConnection is a connection to master.

func NewRPCMasterConnection

func NewRPCMasterConnection(addrSpec string) MasterConnection

NewRPCMasterConnection creates a new RPCMasterConnection to the master set with addresses in 'addrSpec'.

type Options

type Options struct {
	// The cluster this client will connect to. Most users can use a simple
	// cluster name here. This may also be a cluster/user/service string, to
	// fully specify a service discovery record, or a comma-separated list of
	// master hostnames. If empty, we will try to connect to the default cluster
	// (but this is not recommended; try to use an explicit cluster name if
	// possible).
	Cluster string

	// Whether client will retry operations failed due to "retriable" erorr.
	DisableRetry bool

	// RetryTimeout bounds the total time of retries if it's greater than zero.
	RetryTimeout time.Duration

	// Whether client will use cache.
	DisableCache bool

	// An optional label to diffrentiate metrics from differen client instances.
	// It will be "default" if it's not specified.
	Instance string

	// How the client decides whether to attempt client-side RS reconstruction.
	ReconstructBehavior ReconstructBehavior
}

Options contains configurations of a client.

type RPCCuratorTalker

type RPCCuratorTalker struct {
	// contains filtered or unexported fields
}

RPCCuratorTalker implmenets CuratorTalker based on Go's RPC package.

func (*RPCCuratorTalker) AckExtendBlob

func (r *RPCCuratorTalker) AckExtendBlob(ctx context.Context, addr string, blob core.BlobID, tracts []core.TractInfo) core.Error

AckExtendBlob acks the success of extending 'blob' with the new tracts 'tracts' and term number 'term'.

func (*RPCCuratorTalker) CreateBlob

func (r *RPCCuratorTalker) CreateBlob(ctx context.Context, addr string, metadata core.BlobInfo) (core.BlobID, core.Error)

CreateBlob implements CuratorTalker.

func (*RPCCuratorTalker) DeleteBlob

func (r *RPCCuratorTalker) DeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error

DeleteBlob implements CuratorTalker.

func (*RPCCuratorTalker) ExtendBlob

func (r *RPCCuratorTalker) ExtendBlob(ctx context.Context, addr string, blob core.BlobID, numTracts int) ([]core.TractInfo, core.Error)

ExtendBlob implements CuratorTalker.

func (*RPCCuratorTalker) FixVersion

func (r *RPCCuratorTalker) FixVersion(ctx context.Context, addr string, tract core.TractInfo, bad string) core.Error

FixVersion asks the curator to verify the version(s) of the provided tract as we were unable to use the information in 'tract' to do IO with 'bad'.

func (*RPCCuratorTalker) GetTracts

func (r *RPCCuratorTalker) GetTracts(ctx context.Context, addr string, blob core.BlobID, start, end int,
	forRead, forWrite bool) ([]core.TractInfo, core.Error)

GetTracts implements CuratorTalker.

func (*RPCCuratorTalker) ListBlobs

func (r *RPCCuratorTalker) ListBlobs(ctx context.Context, addr string, partition core.PartitionID, start core.BlobKey) ([]core.BlobKey, core.Error)

ListBlobs gets a contiguous range of blob ids in a given partition.

func (*RPCCuratorTalker) ReportBadTS

func (r *RPCCuratorTalker) ReportBadTS(ctx context.Context, addr string, id core.TractID, bad, op string, got core.Error, couldRecover bool) core.Error

ReportBadTS tells the curator to re-replicate 'id' replacing 'bad'.

func (*RPCCuratorTalker) SetMetadata

func (r *RPCCuratorTalker) SetMetadata(ctx context.Context, addr string, blob core.BlobID, md core.BlobInfo) core.Error

SetMetadata implements CuratorTalker.

func (*RPCCuratorTalker) StatBlob

func (r *RPCCuratorTalker) StatBlob(ctx context.Context, addr string, blob core.BlobID) (core.BlobInfo, core.Error)

StatBlob implements CuratorTalker.

func (*RPCCuratorTalker) UndeleteBlob

func (r *RPCCuratorTalker) UndeleteBlob(ctx context.Context, addr string, blob core.BlobID) core.Error

UndeleteBlob implements CuratorTalker.

type RPCMasterConnection

type RPCMasterConnection struct {
	// contains filtered or unexported fields
}

RPCMasterConnection implements MasterConnection based on Go's RPC pacakge.

func (*RPCMasterConnection) GetTractserverInfo

func (r *RPCMasterConnection) GetTractserverInfo(ctx context.Context) (info []core.TractserverInfo, err core.Error)

GetTractserverInfo gets tractserver state.

func (*RPCMasterConnection) ListPartitions

func (r *RPCMasterConnection) ListPartitions(ctx context.Context) (partitions []core.PartitionID, err core.Error)

ListPartitions returns all existing partitions.

func (*RPCMasterConnection) LookupPartition

func (r *RPCMasterConnection) LookupPartition(ctx context.Context, partition core.PartitionID) (curator string, err core.Error)

LookupPartition calls master 'Lookup' to locate the curator for 'partition'.

func (*RPCMasterConnection) MasterCreateBlob

func (r *RPCMasterConnection) MasterCreateBlob(ctx context.Context) (curator string, err core.Error)

MasterCreateBlob calls master 'Create' to choose a curator to create a blob.

type RPCTractserverTalker

type RPCTractserverTalker struct {
	// contains filtered or unexported fields
}

RPCTractserverTalker implements TractserverTalker based on Go RPC.

func (*RPCTractserverTalker) Create

func (r *RPCTractserverTalker) Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error

Create creates a new tract on the tractserver and does a write to the newly created tract.

func (*RPCTractserverTalker) GetDiskInfo

func (r *RPCTractserverTalker) GetDiskInfo(ctx context.Context, addr string) ([]core.FsStatus, core.Error)

GetDiskInfo returns a summary of disk info.

func (*RPCTractserverTalker) Read

func (r *RPCTractserverTalker) Read(ctx context.Context, addr string, id core.TractID, version int, len int, off int64) ([]byte, core.Error)

Read reads from a tract.

func (*RPCTractserverTalker) ReadInto

func (r *RPCTractserverTalker) ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) (int, core.Error)

ReadInto reads from a tract into a provided slice without copying.

func (*RPCTractserverTalker) SetControlFlags

func (r *RPCTractserverTalker) SetControlFlags(ctx context.Context, addr string, root string, flags core.DiskControlFlags) core.Error

SetControlFlags changes control flags for a disk.

func (*RPCTractserverTalker) StatTract

func (r *RPCTractserverTalker) StatTract(ctx context.Context, addr string, id core.TractID, version int) (int64, core.Error)

StatTract returns the number of bytes in a tract.

func (*RPCTractserverTalker) Write

func (r *RPCTractserverTalker) Write(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) core.Error

Write does a write to a tract on this tractserver.

type ReadaheadBlob

type ReadaheadBlob struct {
	// contains filtered or unexported fields
}

ReadaheadBlob wraps an existing Blob for read and tries to prefetch the data for better performance. It implements io.ReadSeeker interface.

func NewReadaheadBlob

func NewReadaheadBlob(b *Blob) *ReadaheadBlob

NewReadaheadBlob wraps an existing blob and does readahead on top of that.

func (*ReadaheadBlob) ByteLength

func (b *ReadaheadBlob) ByteLength() (int64, error)

ByteLength returns the length of the blob in bytes.

func (*ReadaheadBlob) ID

func (b *ReadaheadBlob) ID() BlobID

ID returns the BlobID of 'b'.

func (*ReadaheadBlob) Read

func (b *ReadaheadBlob) Read(p []byte) (int, error)

Read does the same thing as Blob.Read does.

func (*ReadaheadBlob) Seek

func (b *ReadaheadBlob) Seek(offset int64, whence int) (int64, error)

Seek does the same thing as Blob.Seek does. When a seek is performed all the prefetched data will be discarded. Since internally ReadaheadBlob keeps a buffer with the size of one tract so presumably it will issue just one read for each tract. But if you seek to the middle of a tract then the buffer will not be aligned with a tract anymore, thus it might end up issuing more than one read for a tract.

TODO: we might want to optimize this by duplicating the bufio logic here.

func (*ReadaheadBlob) Stat

func (b *ReadaheadBlob) Stat() (core.BlobInfo, error)

Stat returns returns stat(2)-ish info about the blob.

type ReconstructBehavior

type ReconstructBehavior struct {
	Enabled     bool
	MaxInFlight int
}

ReconstructBehavior lets clients control when the client attempts client-side RS reconstruction (instead of just retrying and waiting for the curator to do it).

type TractserverTalker

type TractserverTalker interface {
	// Create creates a new tract on the tractserver and does a write to the
	// newly created tract.
	Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error

	// Write does a write to a tract on this tractserver.
	Write(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) core.Error

	// Read reads from a tract.
	Read(ctx context.Context, addr string, id core.TractID, version int, len int, off int64) ([]byte, core.Error)

	// ReadInto reads from a tract. It will try to read up to len(b) bytes and put them in b.
	// It returns the number of bytes read, as in io.Reader's Read.
	ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) (int, core.Error)

	// StatTract returns the number of bytes in a tract.
	StatTract(ctx context.Context, addr string, id core.TractID, version int) (int64, core.Error)

	// GetDiskInfo returns a summary of disk info.
	GetDiskInfo(ctx context.Context, addr string) ([]core.FsStatus, core.Error)

	// SetControlFlags changes control flags for a disk.
	SetControlFlags(ctx context.Context, addr string, root string, flags core.DiskControlFlags) core.Error
}

TractserverTalker manages connections to tractservers.

func NewRPCTractserverTalker

func NewRPCTractserverTalker() TractserverTalker

NewRPCTractserverTalker returns a new Golang RPC based implementation of TractserverTalker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL