storage

package
v0.0.0-...-2bf1e3b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2024 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrInvalidID

type ErrInvalidID struct{}

func (ErrInvalidID) Error

func (e ErrInvalidID) Error() string

type ErrNotFound

type ErrNotFound string

func (ErrNotFound) Error

func (e ErrNotFound) Error() string

type ErrNotPossible

type ErrNotPossible struct{}

func (ErrNotPossible) Error

func (ErrNotPossible) Error() string

type ErrReplicaNotFound

type ErrReplicaNotFound string

func (ErrReplicaNotFound) Error

func (e ErrReplicaNotFound) Error() string

type ErrWrongReplicaState

type ErrWrongReplicaState struct{}

func (ErrWrongReplicaState) Error

func (e ErrWrongReplicaState) Error() string

type InsertData

type InsertData struct {
	// The actual data written to disk will be read from this Reader.
	Source io.Reader

	// If provided then this will ensure that the data received from the
	// client is at least this long. If this is zero then it is assumed
	// that the expected length of the data is unknown and therefor should
	// be read until EOF.
	Length int64

	// If this is defined then tracing will be used at various points during
	// the insertion process. If this is nil then no tracing will be performed.
	Tracer *tracing.Trace
}

When Calling Insert there are many different values that can be provided which are all bundled up here. This makes it a little cleaner for passing the data between server -> storage -> primary.

type ReadConfig

type ReadConfig interface {
	// The NameSpace and ID being requested from this storage instance.
	NameSpace() string
	ID() string

	// Returns some FID specific fields as a helper.
	FID() fid.FID
	FIDString() string
	Machine() uint32
	Start() uint64
	Length() uint32

	// If this returns true then the read request will not attempt to make
	// any remote calls. It will purely attempt to fetch the file from the
	// local cache and return 404 if its not found locally.
	LocalOnly() bool

	// Returns the Logger that is associated with this Read operation. If
	// this returns nil then a logger will be created from the BAseLogger
	// in the Storage object.
	Logger() *slog.Logger

	// Since the request may need to be forwarded on to a Remote we need to
	// allow a way for request contexts to be proxied. Since the Storage
	// implementation does not know or care what those contexts are they
	// are simply provided as an interface.
	Context() interface{}
}

When performing a read the arguments can be kind of fluid due to the way that requests are made. Authentication may need to be proxied through to a remote in a way that we do not do with normal Inserts. As such this allows the upstream caller to provide Remote specific details that the storage implementation is simply unaware of.

type Remote

type Remote interface {
	Delete(namespace, fn string) error
	HeartBeat(namespace, fn string) (bool, error)
	Initialize(namespace, fn string) error
	Read(rc ReadConfig) (io.ReadCloser, error)
	Replicate(rc RemoteReplicateConfig) (bool, error)
	String() string
}

An interface that the Storage object will use when interfacing with remote instances. This implementation allows remotes to use any number of protocols or access methods vs statically defining them in the storage module.

type RemoteReplicateConfig

type RemoteReplicateConfig interface {
	FileName() string
	GetBody() io.ReadCloser
	Hash() string
	NameSpace() string
	Offset() uint64
	Size() uint64
}

All of the arguments to Replicate() are bundled up here in order to make it easier to pass objects in and around. This vastly simplifies the function footprint.

type Settings

type Settings struct {
	// User to perform uploads from this namespace.
	AWSUploader *s3manager.Uploader

	// A function that will return a pool of Blobby remotes that
	// should be used for a new Replica.
	AssignRemotes func(int) ([]Remote, error)

	// The base directory that files will be stored in for this namespace.
	BaseDirectory string

	// If this is set to something other than nil then logging will be
	// written to this output.
	BaseLogger *slog.Logger

	// When set to true then the file will be compressed before its uploaded
	// to S3. This will break the ability to fetch identifiers not found in
	// the local file system cache to use accordingly.
	Compress      bool
	CompressLevel int

	// A work queue for Compression related activities.
	CompressWorkQueue *workqueue.WorkQueue

	// If configured to do so then blobby will keep the primary file around
	// after it has been uploaded. This allows Read() operations to use the
	// local file rather than fetching from S3.
	DelayDelete time.Duration

	// The DelayQueue that will be used to schedule events like heart beat
	// timers, replica timeouts, etc.
	DelayQueue *delayqueue.DelayQueue

	// A WorkQueue for processing local file delete requests.
	DeleteLocalWorkQueue *workqueue.WorkQueue

	// A WorkQueue for processing remote replica delete requests.
	DeleteRemotesWorkQueue *workqueue.WorkQueue

	// After this amount of time a replica will be considered "orphaned" and
	// will trigger an upload of the data. This ensures that a primary being
	// lost won't cause data loss.
	HeartBeatTime time.Duration

	// The machine ID that is serving this name space. This must be unique
	// within all of the instances in the list of remotes.
	MachineID uint32

	// The name of the napespace that this Storage implementation will
	// be serving.
	NameSpace string

	// The prefix for the namespace= tag; a value of blobby_ for this field
	// would give blobby_namespace as the tag key in the rendered Prometheus
	// metrics.
	NamespaceTagKeyPrefix string

	// The minimum and maximum number of open master files that are allowed
	// to be open.
	OpenFilesMaximum int32
	OpenFilesMinimum int32

	// A function that fetches data from a remote.
	Read func(ReadConfig) (io.ReadCloser, error)

	// The number of replicas that each master file should be assigned.
	Replicas int

	// S3 client used for downloading objects from S3.
	S3Client *s3.S3

	// The S3 bucket and base path used for uploads as well as an optional
	// formatter for the file name as it will be written to S3. If not
	// provided it will default to the FID string.
	S3Bucket    string
	S3BasePath  string
	S3KeyFormat *fid.Formatter

	// If a file grows beyond this size then it will be moved into an
	// uploading state.
	UploadLargerThan uint64

	// Upload files after this much time regardless of size.
	UploadOlder time.Duration

	// A WorkQueue for processing Upload requests.
	UploadWorkQueue *workqueue.WorkQueue
}

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

Handles on disk storage for a Blobby instance.

func New

func New(settings *Settings) *Storage

Creates a new Storage object from the given settings.

func (*Storage) BlastPathRead

func (s *Storage) BlastPathRead(
	fid string,
	start uint64,
	end uint64,
) (
	io.ReadCloser,
	error,
)

"Blast Path" read function that can fetch a ranged portion of a primary file. This works just like Read except that it will only read from local files and works on byte ranges rather than Blobby IDs.

func (*Storage) BlastPathStatus

func (s *Storage) BlastPathStatus(out io.Writer)

"Blast Path" status output for this Storage Name Space. This will output the current status of the primaries hosted by this instance.

func (*Storage) DebugID

func (s *Storage) DebugID(out io.Writer, id string)

Writes debugging information about the given ID to the writer given.

func (*Storage) GetMetrics

func (s *Storage) GetMetrics() (m metrics.Metrics)

Returns a copy of the metrics associated with this Storage object.

func (*Storage) Health

func (s *Storage) Health() (bool, string)

Returns true if this Storage is healthy and a string representing the reason why this Storage implementation is healthy.

func (*Storage) Insert

func (s *Storage) Insert(
	ctx context.Context,
	data *InsertData,
) (
	id string,
	err error,
)

Inserts new data into one of the open primary files. This will return the ID of the newly created object or an error if something went wrong. If an error is returned then it is not safe to assume that the data was successfully written.

func (*Storage) Read

func (s *Storage) Read(
	ctx context.Context,
	rc ReadConfig,
) (
	io.ReadCloser,
	error,
)

Reads an individual ID (provided via rc). This may involve directly talking to S3, or talking to the remote machine that is serving the given ID.

func (*Storage) ReplicaHeartBeat

func (s *Storage) ReplicaHeartBeat(ctx context.Context, fn string) error

Performs a Heart Beat on a replica. The only error condition here is that the replica does not exist.

func (*Storage) ReplicaInitialize

func (s *Storage) ReplicaInitialize(ctx context.Context, fn string) error

Initializes a new replica in this Storage instance.

func (*Storage) ReplicaQueueDelete

func (s *Storage) ReplicaQueueDelete(ctx context.Context, fn string) error

Queues a replica file for deletion.

func (*Storage) ReplicaReplicate

func (s *Storage) ReplicaReplicate(
	ctx context.Context,
	fn string,
	rc RemoteReplicateConfig,
) error

Performs a replica replication call.

func (*Storage) Start

func (s *Storage) Start(ctx context.Context) error

Starts all of the supporting routines for this Storage implementation. This will also scan the storage directory looking for files created by a previous run of blobby. These will be automatically configured as a replica that is in the Uploading state in order to ensure that the data is written to S3 as quickly as possible since it may be from a failed instance.

func (*Storage) Status

func (s *Storage) Status(out io.Writer)

Gets the status for this Storage implementation and writes it to the given io.Writer. This is a human readable status intended for administration so the format is undefined.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL