
v1.13.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2024 License: Apache-2.0 Imports: 41 Imported by: 56



Package storage defines extensible storage interface. This package registers "storage" config section that maps to Config struct. Use NewDataStore(cfg) to initialize a DataStore with the provided config. The package provides default implementation to access local, S3 (and minio), and In-Memory storage. Use NewCompositeDataStore to swap any portions of the DataStore interface with an external implementation (e.g. a cached protobuf store). The underlying storage is provided by extensible "stow" library. You can use NewStowRawStore(cfg) to create a Raw store based on any other stow-supported configs (e.g. Azure Blob Storage)




View Source
const (
	KiB int64 = 1024
	MiB int64 = 1024 * KiB
View Source
const FailureTypeLabel contextutils.Key = "failure_type"
View Source
const FlyteContentMD5 = "flyteContentMD5"


View Source
var (
	ErrExceedsLimit       stdErrs.ErrorCode = "LIMIT_EXCEEDED"
	ErrFailedToWriteCache stdErrs.ErrorCode = "CACHE_WRITE_FAILED"
View Source
var (
	ConfigSection = config.MustRegisterSection(configSectionKey, defaultConfig)


func IsExceedsLimit

func IsExceedsLimit(err error) bool

IsExceedsLimit gets a value indicating whether the root cause of error is a "limit exceeded" error.

func IsExists

func IsExists(err error) bool

IsExists gets a value indicating whether the underlying error is "already exists" error.

func IsFailedWriteToCache

func IsFailedWriteToCache(err error) bool

func IsNotFound

func IsNotFound(err error) bool

IsNotFound gets a value indicating whether the underlying error is a Not Found error.

func MapStrings

func MapStrings(mapper func(string) string, strings ...string) []string

func MergeMaps

func MergeMaps(dst map[string]string, src[string]string)

MergeMaps merges all src maps into dst in order.

func RegisterStowKind

func RegisterStowKind(kind string, f func(string) DataReference) error

RegisterStowKind registers a new kind of stow store.


type CachingConfig

type CachingConfig struct {
	// Maximum size of the cache where the Blob store data is cached in-memory
	// Refer to to understand how to set the value
	// If not specified or set to 0, cache is not used
	// NOTE: if Object sizes are larger than 1/1024 of the cache size, the entry will not be written to the cache
	// Also refer to to understand how to set the cache
	MaxSizeMegabytes int `` /* 149-byte string literal not displayed */
	// sets the garbage collection target percentage:
	// a collection is triggered when the ratio of freshly allocated data
	// to live data remaining after the previous collection reaches this percentage.
	// refer to
	// If not specified or set to 0, GC percent is not tweaked
	TargetGCPercent int `json:"target_gc_percent" pflag:",Sets the garbage collection target percentage."`

type ComposedProtobufStore

type ComposedProtobufStore interface {

ComposedProtobufStore interface includes all the necessary data to allow a ProtobufStore to interact with storage through a RawStore.

type Config

type Config struct {
	Type Type `json:"type" pflag:",Sets the type of storage to configure [s3/minio/local/mem/stow]."`
	// Deprecated: Please use StowConfig instead
	Connection ConnectionConfig `json:"connection"`
	Stow       StowConfig       `json:"stow,omitempty" pflag:",Storage config for stow backend."`

	// Container here is misleading, it refers to a Bucket (AWS S3) like blobstore entity. In some terms it could be a table
	InitContainer string `json:"container" pflag:",Initial container (in s3 a bucket) to create -if it doesn't exist-.'"`
	// By default if this is not enabled, multiple containers are not supported by the storage layer. Only the configured `container` InitContainer will be allowed to requests data from. But, if enabled then data will be loaded to written to any
	// container specified in the DataReference.
	MultiContainerEnabled bool `` /* 213-byte string literal not displayed */
	// Caching is recommended to improve the performance of underlying systems. It caches the metadata and resolving
	// inputs is accelerated. The size of the cache is large so understand how to configure the cache.
	// TODO provide some default config choices
	// If this section is skipped, Caching is disabled
	Cache             CachingConfig    `json:"cache"`
	Limits            LimitsConfig     `json:"limits" pflag:",Sets limits for stores."`
	DefaultHTTPClient HTTPClientConfig `json:"defaultHttpClient" pflag:",Sets the default http client config."`
	SignedURL         SignedURLConfig  `json:"signedUrl" pflag:",Sets config for SignedURL."`

Config is a common storage config.

func GetConfig

func GetConfig() *Config

GetConfig retrieve current global config for storage.

func (Config) GetPFlagSet

func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet

GetPFlagSet will return strongly types pflags for all fields in Config and its nested types. The format of the flags is json-name.json-sub-name... etc.

type ConnectionConfig

type ConnectionConfig struct {
	Endpoint   config.URL `json:"endpoint" pflag:",URL for storage client to connect to."`
	AuthType   string     `json:"auth-type" pflag:",Auth Type to use [iam,accesskey]."`
	AccessKey  string     `json:"access-key" pflag:",Access key to use. Only required when authtype is set to accesskey."`
	SecretKey  string     `json:"secret-key" pflag:",Secret to use when accesskey is set."`
	Region     string     `json:"region" pflag:",Region to connect to."`
	DisableSSL bool       `json:"disable-ssl" pflag:",Disables SSL connection. Should only be used for development."`

ConnectionConfig defines connection configurations.

type DataReference

type DataReference string

DataReference defines a reference to data location.

func (DataReference) Split

func (r DataReference) Split() (scheme, container, key string, err error)

Split splits the data reference into parts.

func (DataReference) String

func (r DataReference) String() string

type DataStore

type DataStore struct {
	// contains filtered or unexported fields

DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. Today we rely on Stow for multi-cloud support, but this interface abstracts that part

func NewCompositeDataStore

func NewCompositeDataStore(refConstructor ReferenceConstructor, composedProtobufStore ComposedProtobufStore) *DataStore

NewCompositeDataStore composes a new DataStore.

func NewDataStore

func NewDataStore(cfg *Config, scope promutils.Scope) (s *DataStore, err error)

NewDataStore creates a new Data Store with the supplied config.

testScope := promutils.NewTestScope()
ctx := context.Background()
fmt.Println("Creating in memory data store.")
store, err := NewDataStore(&Config{
	Type: TypeMemory,
}, testScope.NewSubScope("exp_new"))

if err != nil {
	fmt.Printf("Failed to create data store. Error: %v", err)

ref, err := store.ConstructReference(ctx, DataReference("root"), "subkey", "subkey2")
if err != nil {
	fmt.Printf("Failed to construct data reference. Error: %v", err)

fmt.Printf("Constructed data reference [%v] and writing data to it.\n", ref)

dataToStore := "hello world"
err = store.WriteRaw(ctx, ref, int64(len(dataToStore)), Options{}, strings.NewReader(dataToStore))
if err != nil {
	fmt.Printf("Failed to write data. Error: %v", err)

Creating in memory data store.
Constructed data reference [/root/subkey/subkey2] and writing data to it.

func NewDataStoreWithContext

func NewDataStoreWithContext(ctx context.Context, cfg *Config, scope promutils.Scope) (s *DataStore, err error)

NewDataStoreWithContext creates a new Data Store with the supplied config and context.

func (*DataStore) RefreshConfig

func (ds *DataStore) RefreshConfig(ctx context.Context, cfg *Config) error

RefreshConfig re-initialises the data store client leaving metrics untouched. This is NOT thread-safe!

type DefaultProtobufStore

type DefaultProtobufStore struct {
	// contains filtered or unexported fields

Implements ProtobufStore to marshal and unmarshal protobufs to/from a RawStore

func NewDefaultProtobufStore

func NewDefaultProtobufStore(store RawStore, scope promutils.Scope) DefaultProtobufStore

func NewDefaultProtobufStoreWithMetrics

func NewDefaultProtobufStoreWithMetrics(store RawStore, metrics *protoMetrics) DefaultProtobufStore

func (DefaultProtobufStore) ReadProtobuf

func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error

func (DefaultProtobufStore) WriteProtobuf

func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error

type HTTPClientConfig

type HTTPClientConfig struct {
	Headers map[string][]string `json:"headers" pflag:"-,Sets http headers to set on the http client."`
	Timeout config.Duration     `json:"timeout" pflag:",Sets time out on the http client."`

HTTPClientConfig encapsulates common settings that can be applied to an HTTP Client.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields

func (*InMemoryStore) Clear

func (s *InMemoryStore) Clear(ctx context.Context) error

func (InMemoryStore) CopyRaw

func (c InMemoryStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error

CopyRaw is a naiive implementation for copy that reads all data locally then writes them to destination. TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy:

func (*InMemoryStore) CreateSignedURL

func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)

CreateSignedURL creates a signed url with the provided properties.

func (*InMemoryStore) Delete

func (s *InMemoryStore) Delete(ctx context.Context, reference DataReference) error

Delete removes the referenced data from the cache map.

func (*InMemoryStore) GetBaseContainerFQN

func (s *InMemoryStore) GetBaseContainerFQN(ctx context.Context) DataReference

func (*InMemoryStore) Head

func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Metadata, error)

func (*InMemoryStore) ReadRaw

func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)

func (*InMemoryStore) WriteRaw

func (s *InMemoryStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) (
	err error)

type LimitsConfig

type LimitsConfig struct {
	GetLimitMegabytes int64 `json:"maxDownloadMBs" pflag:",Maximum allowed download size (in MBs) per call."`

LimitsConfig specifies limits for storage package.

type MemoryMetadata

type MemoryMetadata struct {
	// contains filtered or unexported fields

func (MemoryMetadata) ContentMD5 added in v1.11.0

func (m MemoryMetadata) ContentMD5() string

func (MemoryMetadata) Etag

func (m MemoryMetadata) Etag() string

func (MemoryMetadata) Exists

func (m MemoryMetadata) Exists() bool

func (MemoryMetadata) Size

func (m MemoryMetadata) Size() int64

type Metadata

type Metadata interface {
	Exists() bool
	Size() int64
	Etag() string
	// ContentMD5 retrieves the value of a special metadata tag added by the system that
	// contains the MD5 of the uploaded file. If there is no metadata attached
	// or that `FlyteContentMD5` key isn't set, ContentMD5 will return empty.
	ContentMD5() string

Metadata is a placeholder for data reference metadata.

type Options

type Options struct {
	Metadata map[string]interface{}

Options holds storage options. It is used to pass Metadata (like headers for S3) and also tags or labels for objects

type ProtobufStore

type ProtobufStore interface {
	// ReadProtobuf retrieves the entire blob from blobstore and unmarshals it to the passed protobuf
	ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error

	// WriteProtobuf serializes and stores the protobuf.
	WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error

ProtobufStore defines an interface for reading and writing protobuf messages

type RawStore

type RawStore interface {
	// GetBaseContainerFQN returns a FQN DataReference with the configured base init container
	GetBaseContainerFQN(ctx context.Context) DataReference

	// CreateSignedURL creates a signed url with the provided properties.
	CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)

	// Head gets metadata about the reference. This should generally be a light weight operation.
	Head(ctx context.Context, reference DataReference) (Metadata, error)

	// ReadRaw retrieves a byte array from the Blob store or an error
	ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)

	// WriteRaw stores a raw byte array.
	WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error

	// CopyRaw copies from source to destination.
	CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error

	// Delete removes the referenced data from the blob store.
	Delete(ctx context.Context, reference DataReference) error

RawStore defines a low level interface for accessing and storing bytes.

func NewInMemoryRawStore

func NewInMemoryRawStore(_ context.Context, _ *Config, metrics *dataStoreMetrics) (RawStore, error)

type ReferenceConstructor

type ReferenceConstructor interface {
	// ConstructReference creates a new dataReference that matches the storage structure.
	ConstructReference(ctx context.Context, reference DataReference, nestedKeys ...string) (DataReference, error)

ReferenceConstructor defines an interface for building data reference paths.

type SignedURLConfig

type SignedURLConfig struct {
	StowConfigOverride map[string]string `json:"stowConfigOverride,omitempty" pflag:"-,Configuration for stow backend. Refer to github/flyteorg/stow"`

SignedURLConfig encapsulates configs specifically used for SignedURL behavior.

type SignedURLProperties

type SignedURLProperties struct {
	// Scope defines the permission level allowed for the generated URL.
	Scope stow.ClientMethod
	// ExpiresIn defines the expiration duration for the URL. It's strongly recommended setting it.
	ExpiresIn time.Duration
	// ContentMD5 defines the expected hash of the generated file. It's strongly recommended setting it.
	ContentMD5 string
	// AddContentMD5Metadata Add ContentMD5 to the metadata of signed URL if true.
	AddContentMD5Metadata bool

SignedURLProperties encapsulates properties about the signedURL operation.

type SignedURLResponse

type SignedURLResponse struct {
	URL                    url.URL
	RequiredRequestHeaders map[string]string

type StowConfig

type StowConfig struct {
	Kind   string            `json:"kind,omitempty" pflag:",Kind of Stow backend to use. Refer to github/flyteorg/stow"`
	Config map[string]string `json:"config,omitempty" pflag:",Configuration for stow backend. Refer to github/flyteorg/stow"`

StowConfig defines configs for stow as defined in

type StowMetadata

type StowMetadata struct {
	// contains filtered or unexported fields

StowMetadata that will be returned

func (StowMetadata) ContentMD5 added in v1.11.0

func (s StowMetadata) ContentMD5() string

func (StowMetadata) Etag

func (s StowMetadata) Etag() string

func (StowMetadata) Exists

func (s StowMetadata) Exists() bool

func (StowMetadata) Size

func (s StowMetadata) Size() int64

type StowStore

type StowStore struct {
	// contains filtered or unexported fields

Implements DataStore to talk to stow location store.

func NewStowRawStore

func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Location, enableDynamicContainerLoading bool, metrics *dataStoreMetrics) (*StowStore, error)

func (StowStore) CopyRaw

func (c StowStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error

CopyRaw is a naiive implementation for copy that reads all data locally then writes them to destination. TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy:

func (*StowStore) CreateContainer

func (s *StowStore) CreateContainer(ctx context.Context, container string) (stow.Container, error)

func (*StowStore) CreateSignedURL

func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)

func (*StowStore) Delete

func (s *StowStore) Delete(ctx context.Context, reference DataReference) error

Delete removes the referenced data from the blob store.

func (*StowStore) GetBaseContainerFQN

func (s *StowStore) GetBaseContainerFQN(ctx context.Context) DataReference

func (*StowStore) Head

func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata, error)

func (*StowStore) LoadContainer

func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error)

func (*StowStore) ReadRaw

func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)

func (*StowStore) WriteRaw

func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error

type Type

type Type = string

Type defines the storage config type.

const (
	TypeMemory Type = "mem"
	TypeS3     Type = "s3"
	TypeLocal  Type = "local"
	TypeMinio  Type = "minio"
	TypeStow   Type = "stow"

type URLPathConstructor

type URLPathConstructor struct {

URLPathConstructor implements ReferenceConstructor that assumes paths are URL-compatible.

func NewURLPathConstructor

func NewURLPathConstructor() URLPathConstructor

func (URLPathConstructor) ConstructReference

func (URLPathConstructor) ConstructReference(ctx context.Context, reference DataReference, nestedKeys ...string) (DataReference, error)


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL