fileservice

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2024 License: Apache-2.0 Imports: 70 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SkipMemoryCacheReads = 1 << iota
	SkipMemoryCacheWrites
	SkipDiskCacheReads
	SkipDiskCacheWrites
	SkipFullFilePreloads
)
View Source
const DisableCacheCapacity = 1
View Source
const ServiceNameSeparator = ":"

Variables

View Source
var CtxKeyDiskCacheCallbacks ctxKeyDiskCacheCallbacks
View Source
var CtxKeyStatementProfiler ctxKeyStatementProfiler
View Source
var DefaultCacheDataAllocator = new(bytesAllocator)

var DefaultCacheDataAllocator = RCBytesPool

View Source
var DisabledCacheConfig = CacheConfig{
	MemoryCapacity: ptrTo[toml.ByteSize](DisableCacheCapacity),
	DiskCapacity:   ptrTo[toml.ByteSize](DisableCacheCapacity),
}
View Source
var ErrNotCacheFile = errorStr("not a cache file")
View Source
var FSProfileHandler = NewProfileHandler()
View Source
var NoDefaultCredentialsForETL = os.Getenv("MO_NO_DEFAULT_CREDENTIALS") != ""
View Source
var PerStatementProfileDir = os.Getenv("PER_STMT_PROFILE_DIR")
View Source
var PerStatementProfileThreshold = func() time.Duration {
	str := os.Getenv("PER_STMT_PROFILE_THRESHOLD_MSEC")
	if str == "" {
		return time.Millisecond * 500
	}
	n, err := strconv.Atoi(str)
	if err != nil {
		panic(err)
	}
	return time.Millisecond * time.Duration(n)
}()
View Source
var RCBytesPool = func() *rcBytesPool {
	ret := &rcBytesPool{}
	for size := rcBytesPoolMinCap; size <= rcBytesPoolMaxCap; size *= 2 {
		size := size
		ret.sizes = append(ret.sizes, size)
		ret.pools = append(ret.pools, NewRCPool(func() []byte {
			return make([]byte, size)
		}))
	}
	return ret
}()

RCBytesPool is the global RCBytes pool

Functions

func EnsureStatementProfiler added in v1.0.0

func EnsureStatementProfiler(ctx context.Context, from context.Context) context.Context

EnsureStatementProfiler ensure a statement profiler is set in context, if not, copy one from another context

func Get added in v0.6.0

func Get[T any](fs FileService, name string) (res T, err error)

func HandleRemoteRead added in v1.0.0

func HandleRemoteRead(
	ctx context.Context, fs FileService, req *pb.Request, resp *pb.CacheResponse,
) error

func JoinPath added in v0.6.0

func JoinPath(serviceName string, path string) string

func NewFileWithChecksumOSFile added in v0.8.0

func NewFileWithChecksumOSFile(
	ctx context.Context,
	underlying *os.File,
	blockContentSize int,
	perfCounterSets []*perfcounter.CounterSet,
) (*FileWithChecksum[*os.File], PutBack[*FileWithChecksum[*os.File]])

func NewLRUCache added in v1.0.0

func NewLRUCache(
	capacity int64,
	checkOverlaps bool,
	callbacks *CacheCallbacks,
) *lrucache.LRU[CacheKey, CacheData]

func NewStatementProfiler added in v1.0.0

func NewStatementProfiler(
	ctx context.Context,
) (
	newCtx context.Context,
	end func(
		fileSuffixFunc func() string,
	),
)

func OnDiskCacheEvict added in v1.0.0

func OnDiskCacheEvict(ctx context.Context, fn OnDiskCacheEvictFunc) (ret context.Context)

func OnDiskCacheWritten added in v1.0.0

func OnDiskCacheWritten(ctx context.Context, fn OnDiskCacheWrittenFunc) (ret context.Context)

func StatementProfileNewSpan added in v1.0.0

func StatementProfileNewSpan(
	ctx context.Context,
) (
	_ context.Context,
	end func(),
)

StatementProfileNewSpan return context.Context and func() function. return the input ctx and nil function, if ctx.Value(CtxKeyStatementProfiler) is nil.

Types

type AliyunSDK added in v1.0.0

type AliyunSDK struct {
	// contains filtered or unexported fields
}

func NewAliyunSDK added in v1.0.0

func NewAliyunSDK(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (_ *AliyunSDK, err error)

func (*AliyunSDK) Delete added in v1.0.0

func (a *AliyunSDK) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*AliyunSDK) Exists added in v1.0.0

func (a *AliyunSDK) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*AliyunSDK) List added in v1.0.0

func (a *AliyunSDK) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*AliyunSDK) Read added in v1.0.0

func (a *AliyunSDK) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*AliyunSDK) Stat added in v1.0.0

func (a *AliyunSDK) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*AliyunSDK) Write added in v1.0.0

func (a *AliyunSDK) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type AwsSDKv1 added in v1.0.0

type AwsSDKv1 struct {
	// contains filtered or unexported fields
}

func NewAwsSDKv1 added in v1.0.0

func NewAwsSDKv1(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (*AwsSDKv1, error)

func (*AwsSDKv1) Delete added in v1.0.0

func (a *AwsSDKv1) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*AwsSDKv1) Exists added in v1.0.0

func (a *AwsSDKv1) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*AwsSDKv1) List added in v1.0.0

func (a *AwsSDKv1) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*AwsSDKv1) Read added in v1.0.0

func (a *AwsSDKv1) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*AwsSDKv1) Stat added in v1.0.0

func (a *AwsSDKv1) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*AwsSDKv1) Write added in v1.0.0

func (a *AwsSDKv1) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type AwsSDKv2 added in v1.0.0

type AwsSDKv2 struct {
	// contains filtered or unexported fields
}

func NewAwsSDKv2 added in v1.0.0

func NewAwsSDKv2(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (*AwsSDKv2, error)

func (*AwsSDKv2) Delete added in v1.0.0

func (a *AwsSDKv2) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*AwsSDKv2) Exists added in v1.0.0

func (a *AwsSDKv2) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*AwsSDKv2) List added in v1.0.0

func (a *AwsSDKv2) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*AwsSDKv2) Read added in v1.0.0

func (a *AwsSDKv2) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*AwsSDKv2) Stat added in v1.0.0

func (a *AwsSDKv2) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*AwsSDKv2) Write added in v1.0.0

func (a *AwsSDKv2) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type Bytes added in v1.0.0

type Bytes []byte

func (Bytes) Bytes added in v1.0.0

func (b Bytes) Bytes() []byte

func (Bytes) Release added in v1.0.0

func (b Bytes) Release()

func (Bytes) Retain added in v1.0.0

func (b Bytes) Retain()

func (Bytes) Size added in v1.0.0

func (b Bytes) Size() int64

func (Bytes) Slice added in v1.0.0

func (b Bytes) Slice(length int) CacheData

type CacheCallbackFunc added in v1.0.0

type CacheCallbackFunc = func(CacheKey, CacheData)

type CacheCallbacks added in v1.0.0

type CacheCallbacks struct {
	PostGet   []CacheCallbackFunc
	PostSet   []CacheCallbackFunc
	PostEvict []CacheCallbackFunc
}

type CacheConfig added in v0.6.0

type CacheConfig struct {
	MemoryCapacity       *toml.ByteSize `toml:"memory-capacity" user_setting:"advanced"`
	DiskPath             *string        `toml:"disk-path"`
	DiskCapacity         *toml.ByteSize `toml:"disk-capacity"`
	DiskMinEvictInterval *toml.Duration `toml:"disk-min-evict-interval"`
	DiskEvictTarget      *float64       `toml:"disk-evict-target"`
	RemoteCacheEnabled   bool           `toml:"remote-cache-enabled"`
	RPC                  morpc.Config   `toml:"rpc"`

	CacheClient      client.CacheClient `json:"-"`
	KeyRouterFactory KeyRouterFactory   `json:"-"`
	KeyRouter        KeyRouter          `json:"-"`
	InitKeyRouter    *sync.Once         `json:"-"`
	CacheCallbacks   `json:"-"`
	// contains filtered or unexported fields
}

func (*CacheConfig) SetRemoteCacheCallback added in v1.0.0

func (c *CacheConfig) SetRemoteCacheCallback()

type CacheData added in v1.0.0

type CacheData interface {
	Bytes() []byte
	Slice(length int) CacheData
	Release()
	Retain()
}

func CacheOriginalData added in v1.0.0

func CacheOriginalData(r io.Reader, data []byte, allocator CacheDataAllocator) (cacheData CacheData, err error)

type CacheDataAllocator added in v1.0.0

type CacheDataAllocator interface {
	Alloc(size int) CacheData
}

type CacheKey added in v0.6.0

type CacheKey = pb.CacheKey

type CachingFileService added in v0.6.0

type CachingFileService interface {
	FileService

	// FlushCache flushes cache
	FlushCache()

	// SetAsyncUpdate sets cache update operation to async mode
	SetAsyncUpdate(bool)
}

CachingFileService is an extension to the FileService

type Config added in v0.6.0

type Config struct {
	// Name name of fileservice, describe what an instance of fileservice is used for
	Name string `toml:"name"`
	// Backend fileservice backend. [MEM|DISK|DISK-ETL|S3|MINIO]
	Backend string `toml:"backend"`
	// S3 used to create fileservice using s3 as the backend
	S3 ObjectStorageArguments `toml:"s3"`
	// Cache specifies configs for cache
	Cache CacheConfig `toml:"cache"`
	// DataDir used to create fileservice using DISK as the backend
	DataDir string `toml:"data-dir"`
	// FixMissing inidicates the file service to try its best to fix missing files
	FixMissing bool `toml:"fix-missing"`
}

Config fileService config

type DataCache added in v1.0.0

type DataCache interface {
	Set(ctx context.Context, key CacheKey, value CacheData)
	Get(ctx context.Context, key CacheKey) (value CacheData, ok bool)
	//TODO file contents may change, so we still need this s.
	DeletePaths(ctx context.Context, paths []string)
	Flush()
	Capacity() int64
	Used() int64
	Available() int64
}

DataCache caches IOEntry.CachedData

type DirEntry

type DirEntry struct {
	// file name, not full path
	Name  string
	IsDir bool
	Size  int64
}

DirEntry is a file or dir

type DiskCache added in v0.7.0

type DiskCache struct {
	// contains filtered or unexported fields
}

func NewDiskCache added in v0.7.0

func NewDiskCache(
	ctx context.Context,
	path string,
	capacity int64,
	evictInterval time.Duration,
	evictTarget float64,
	perfCounterSets []*perfcounter.CounterSet,
) (*DiskCache, error)

func (*DiskCache) DeletePaths added in v1.1.0

func (d *DiskCache) DeletePaths(
	ctx context.Context,
	paths []string,
) error

func (*DiskCache) Flush added in v0.7.0

func (d *DiskCache) Flush()

func (*DiskCache) Read added in v0.7.0

func (d *DiskCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*DiskCache) SetFile added in v1.0.0

func (d *DiskCache) SetFile(
	ctx context.Context,
	path string,
	openReader func(context.Context) (io.ReadCloser, error),
) error

func (*DiskCache) Update added in v0.7.0

func (d *DiskCache) Update(
	ctx context.Context,
	vector *IOVector,
	async bool,
) (
	err error,
)

type DiskCacheCallbacks added in v1.0.0

type DiskCacheCallbacks struct {
	OnWritten []OnDiskCacheWrittenFunc
	OnEvict   []OnDiskCacheEvictFunc
}

type ETLFileService added in v0.6.0

type ETLFileService interface {
	FileService

	// ETLCompatible marks the implementation to be compatible to ETL operations
	// implementations must save file contents as-is
	ETLCompatible()
}

ETLFileService is an extension to the FileService

func GetForETL added in v0.6.0

func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileService, readPath string, err error)

GetForETL get or creates a FileService instance for ETL operations if service part of path is empty, a LocalETLFS will be created if service part of path is not empty, a ETLFileService typed instance will be extracted from fs argument if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-no-key,<endpoint>,<region>,<bucket>,<prefix> minio,<endpoint>,<region>,<bucket>,<key>,<secret>,<prefix> s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>

key value pairs can be in any order

type FileCache added in v1.0.0

type FileCache interface {
	SetFile(
		ctx context.Context,
		path string,
		openReader func(context.Context) (io.ReadCloser, error),
	) error
}

type FileLike added in v0.6.0

type FileLike interface {
	io.ReadWriteSeeker
	io.WriterAt
	io.ReaderAt
}

type FileService

type FileService interface {
	// Name is file service's name
	// service name is case-insensitive
	Name() string

	// Write writes a new file
	// returns ErrFileExisted if file already existed
	// returns ErrSizeNotMatch if provided size does not match data
	// entries in vector should be written atomically. if write failed, following reads must not succeed.
	Write(ctx context.Context, vector IOVector) error

	// Read reads a file to fill IOEntries
	// returns ErrFileNotFound if requested file not found
	// returns ErrUnexpectedEOF if less data is read than requested size
	// returns ErrEmptyRange if no data at specified offset and size
	// returns ErrEmptyVector if no IOEntry is passed
	Read(ctx context.Context, vector *IOVector) error

	// ReadCache reads cached data if any
	// if cache hit, IOEntry.CachedData will be set
	ReadCache(ctx context.Context, vector *IOVector) error

	// List lists sub-entries in a dir
	List(ctx context.Context, dirPath string) ([]DirEntry, error)

	// Delete deletes multi file
	// returns ErrFileNotFound if requested file not found
	Delete(ctx context.Context, filePaths ...string) error

	// Stat returns infomations about a file
	// returns ErrFileNotFound if requested file not found
	StatFile(ctx context.Context, filePath string) (*DirEntry, error)

	// PrefetchFile prefetches a file
	PrefetchFile(ctx context.Context, filePath string) error
}

FileService is a write-once file system

func GetForBackup added in v1.0.0

func GetForBackup(ctx context.Context, spec string) (res FileService, err error)

GetForBackup creates a FileService instance for backup operations if service part of path is empty, a LocalFS will be created if service part of path is argumented, a FileService instance will be created dynamically with those arguments supported dynamic file service: s3-opts,endpoint=<endpoint>,region=<region>,bucket=<bucket>,key=<key>,secret=<secret>,prefix=<prefix>,role-arn=<role arn>,external-id=<external id>,is-minio=<is-minio>

func NewFileService added in v0.6.0

func NewFileService(
	ctx context.Context, cfg Config, perfCounterSets []*perfcounter.CounterSet,
) (FileService, error)

NewFileService create file service from config

func SubPath added in v0.8.0

func SubPath(upstream FileService, path string) FileService

SubPath returns a FileService instance that operates at specified sub path of the upstream instance

type FileServices added in v0.6.0

type FileServices struct {
	// contains filtered or unexported fields
}

func NewFileServices added in v0.6.0

func NewFileServices(defaultName string, fss ...FileService) (*FileServices, error)

func (*FileServices) Delete added in v0.6.0

func (f *FileServices) Delete(ctx context.Context, filePaths ...string) error

func (*FileServices) List added in v0.6.0

func (f *FileServices) List(ctx context.Context, dirPath string) ([]DirEntry, error)

func (*FileServices) Name added in v0.6.0

func (f *FileServices) Name() string

func (*FileServices) PrefetchFile added in v1.0.1

func (f *FileServices) PrefetchFile(ctx context.Context, filePath string) error

func (*FileServices) Read added in v0.6.0

func (f *FileServices) Read(ctx context.Context, vector *IOVector) error

func (*FileServices) ReadCache added in v1.0.0

func (f *FileServices) ReadCache(ctx context.Context, vector *IOVector) error

func (*FileServices) StatFile added in v0.7.0

func (f *FileServices) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*FileServices) Write added in v0.6.0

func (f *FileServices) Write(ctx context.Context, vector IOVector) error

type FileWithChecksum added in v0.6.0

type FileWithChecksum[T FileLike] struct {
	// contains filtered or unexported fields
}

FileWithChecksum maps file contents to blocks with checksum

func NewFileWithChecksum added in v0.6.0

func NewFileWithChecksum[T FileLike](
	ctx context.Context,
	underlying T,
	blockContentSize int,
	perfCounterSets []*perfcounter.CounterSet,
) *FileWithChecksum[T]

func (*FileWithChecksum[T]) Read added in v0.6.0

func (f *FileWithChecksum[T]) Read(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) ReadAt added in v0.6.0

func (f *FileWithChecksum[T]) ReadAt(buf []byte, offset int64) (n int, err error)

func (*FileWithChecksum[T]) Seek added in v0.6.0

func (f *FileWithChecksum[T]) Seek(offset int64, whence int) (int64, error)

func (*FileWithChecksum[T]) Write added in v0.6.0

func (f *FileWithChecksum[T]) Write(buf []byte) (n int, err error)

func (*FileWithChecksum[T]) WriteAt added in v0.6.0

func (f *FileWithChecksum[T]) WriteAt(buf []byte, offset int64) (n int, err error)

type IOEntry

type IOEntry struct {
	// offset in file
	// when writing or mutating, offset can be arbitrary value, gaps between provided data are zero-filled
	// when reading, valid offsets are in range [0, len(file) - 1]
	Offset int64

	// number of bytes to read or write, [1, len(file)]
	// when reading, pass -1 to read to the end of file
	Size int64

	// raw content
	// when reading, if len(Data) < Size, a new Size-lengthed byte slice will be allocated
	Data []byte

	// when reading, if Writer is not nil, write data to it instead of setting Data field
	WriterForRead io.Writer

	// when reading, if ReadCloser is not nil, set an io.ReadCloser instead of setting Data field
	ReadCloserForRead *io.ReadCloser

	// when writing, if Reader is not nil, read data from it instead of reading Data field
	// number of bytes to be read is specified by Size field
	// if number of bytes is unknown, set Size field to -1
	ReaderForWrite io.Reader

	// When reading, if the ToCacheData field is not nil, the returning object's byte slice will be set to this field
	// Data, WriterForRead, ReadCloserForRead may be empty if CachedData is not null
	// if ToCacheData is provided, caller should always read CachedData instead of Data, WriterForRead or ReadCloserForRead
	CachedData CacheData

	// ToCacheData constructs an object byte slice from entry contents
	// reader or data must not be retained after returns
	// reader always contains entry contents
	// data may contains entry contents if available
	// if data is empty, the io.Reader must be fully read before returning nil error
	ToCacheData func(reader io.Reader, data []byte, allocator CacheDataAllocator) (cacheData CacheData, err error)
	// contains filtered or unexported fields
}

func (*IOEntry) ReadFromOSFile added in v0.8.0

func (e *IOEntry) ReadFromOSFile(file *os.File) error

type IOLockKey added in v1.0.0

type IOLockKey struct {
	File string
}

type IOLocks added in v1.0.0

type IOLocks struct {
	// contains filtered or unexported fields
}

func (*IOLocks) Lock added in v1.0.0

func (i *IOLocks) Lock(key IOLockKey) (unlock func(), wait func())

type IOVector

type IOVector struct {

	// FilePath indicates where to find the file
	// a path has two parts, service name and file name, separated by ':'
	// service name is optional, if omitted, the receiver FileService will use the default name of the service
	// file name parts are separated by '/'
	// valid characters in file name: 0-9 a-z A-Z / ! - _ . * ' ( )
	// and all printable non-ASCII characters
	// example:
	// s3:a/b/c S3:a/b/c represents the same file 'a/b/c' located in 'S3' service
	FilePath string

	// io entries
	// empty Entries is not allowed
	// when writing, overlapping Entries is not allowed
	Entries []IOEntry

	// ExpireAt specifies the expire time of the file
	// implementations may or may not delete the file after this time
	// zero value means no expire
	ExpireAt time.Time

	// Policy controls policy for the vector
	Policy Policy

	// Caches indicates extra caches to operate on
	Caches []IOVectorCache
}

func (*IOVector) Release added in v1.0.0

func (i *IOVector) Release()

type IOVectorCache added in v0.8.0

type IOVectorCache interface {
	Read(
		ctx context.Context,
		vector *IOVector,
	) error
	Update(
		ctx context.Context,
		vector *IOVector,
		async bool,
	) error
	Flush()
	//TODO file contents may change, so we still need this s.
	DeletePaths(
		ctx context.Context,
		paths []string,
	) error
}

VectorCache caches IOVector

type KeyRouter added in v1.0.0

type KeyRouter interface {
	// Target returns the remote cache server service address of
	// the cache key. If the cache do not exist in any node, it
	// returns empty string.
	Target(k CacheKey) string

	// AddItem pushes a cache key item into a queue with a local
	// cache server service address in the item. Gossip will take
	// all the items and send them to other nodes in gossip cluster.
	AddItem(key CacheKey, operation gpb.Operation)
}

KeyRouter is an interface manages the remote cache information.

type KeyRouterFactory added in v1.0.0

type KeyRouterFactory func() KeyRouter

type LocalETLFS added in v0.6.0

type LocalETLFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalETLFS is a FileService implementation backed by local file system and suitable for ETL operations

func NewLocalETLFS added in v0.6.0

func NewLocalETLFS(name string, rootPath string) (*LocalETLFS, error)

func (*LocalETLFS) Delete added in v0.6.0

func (l *LocalETLFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalETLFS) ETLCompatible added in v0.6.0

func (l *LocalETLFS) ETLCompatible()

func (*LocalETLFS) List added in v0.6.0

func (l *LocalETLFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalETLFS) Name added in v0.6.0

func (l *LocalETLFS) Name() string

func (*LocalETLFS) NewMutator added in v0.6.0

func (l *LocalETLFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)

func (*LocalETLFS) PrefetchFile added in v1.0.1

func (l *LocalETLFS) PrefetchFile(ctx context.Context, filePath string) error

func (*LocalETLFS) Read added in v0.6.0

func (l *LocalETLFS) Read(ctx context.Context, vector *IOVector) error

func (*LocalETLFS) ReadCache added in v1.0.0

func (l *LocalETLFS) ReadCache(ctx context.Context, vector *IOVector) error

func (*LocalETLFS) StatFile added in v0.7.0

func (l *LocalETLFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalETLFS) Write added in v0.6.0

func (l *LocalETLFS) Write(ctx context.Context, vector IOVector) error

type LocalETLFSMutator added in v0.6.0

type LocalETLFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalETLFSMutator) Append added in v0.6.0

func (l *LocalETLFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalETLFSMutator) Close added in v0.6.0

func (l *LocalETLFSMutator) Close() error

func (*LocalETLFSMutator) Mutate added in v0.6.0

func (l *LocalETLFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type LocalFS

type LocalFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalFS is a FileService implementation backed by local file system

func NewLocalFS

func NewLocalFS(
	ctx context.Context,
	name string,
	rootPath string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
) (*LocalFS, error)

func (*LocalFS) Delete

func (l *LocalFS) Delete(ctx context.Context, filePaths ...string) error

func (*LocalFS) FlushCache added in v0.6.0

func (l *LocalFS) FlushCache()

func (*LocalFS) List

func (l *LocalFS) List(ctx context.Context, dirPath string) (ret []DirEntry, err error)

func (*LocalFS) Name added in v0.6.0

func (l *LocalFS) Name() string

func (*LocalFS) NewMutator added in v0.6.0

func (l *LocalFS) NewMutator(ctx context.Context, filePath string) (Mutator, error)

func (*LocalFS) PrefetchFile added in v1.0.1

func (l *LocalFS) PrefetchFile(ctx context.Context, filePath string) error

func (*LocalFS) Read

func (l *LocalFS) Read(ctx context.Context, vector *IOVector) (err error)

func (*LocalFS) ReadCache added in v1.0.0

func (l *LocalFS) ReadCache(ctx context.Context, vector *IOVector) (err error)

func (*LocalFS) Replace added in v0.6.0

func (l *LocalFS) Replace(ctx context.Context, vector IOVector) error

func (*LocalFS) SetAsyncUpdate added in v0.8.0

func (l *LocalFS) SetAsyncUpdate(b bool)

func (*LocalFS) StatFile added in v0.7.0

func (l *LocalFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*LocalFS) Write

func (l *LocalFS) Write(ctx context.Context, vector IOVector) error

type LocalFSMutator added in v0.6.0

type LocalFSMutator struct {
	// contains filtered or unexported fields
}

func (*LocalFSMutator) Append added in v0.6.0

func (l *LocalFSMutator) Append(ctx context.Context, entries ...IOEntry) error

func (*LocalFSMutator) Close added in v0.6.0

func (l *LocalFSMutator) Close() error

func (*LocalFSMutator) Mutate added in v0.6.0

func (l *LocalFSMutator) Mutate(ctx context.Context, entries ...IOEntry) error

type MemCache added in v0.6.0

type MemCache struct {
	// contains filtered or unexported fields
}

func NewMemCache added in v0.6.0

func NewMemCache(
	dataCache DataCache,
	counterSets []*perfcounter.CounterSet,
) *MemCache

func (*MemCache) DeletePaths added in v1.1.0

func (m *MemCache) DeletePaths(
	ctx context.Context,
	paths []string,
) error

func (*MemCache) Flush added in v0.6.0

func (m *MemCache) Flush()

func (*MemCache) Read added in v0.6.0

func (m *MemCache) Read(
	ctx context.Context,
	vector *IOVector,
) (
	err error,
)

func (*MemCache) Update added in v0.7.0

func (m *MemCache) Update(
	ctx context.Context,
	vector *IOVector,
	async bool,
) error

type MemoryFS

type MemoryFS struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemoryFS is an in-memory FileService implementation

func NewMemoryFS

func NewMemoryFS(
	name string,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
) (*MemoryFS, error)

func (*MemoryFS) Delete

func (m *MemoryFS) Delete(ctx context.Context, filePaths ...string) error

func (*MemoryFS) ETLCompatible added in v0.6.0

func (m *MemoryFS) ETLCompatible()

func (*MemoryFS) List

func (m *MemoryFS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*MemoryFS) Name added in v0.6.0

func (m *MemoryFS) Name() string

func (*MemoryFS) PrefetchFile added in v1.0.1

func (m *MemoryFS) PrefetchFile(ctx context.Context, filePath string) error

func (*MemoryFS) Read

func (m *MemoryFS) Read(ctx context.Context, vector *IOVector) (err error)

func (*MemoryFS) ReadCache added in v1.0.0

func (m *MemoryFS) ReadCache(ctx context.Context, vector *IOVector) (err error)

func (*MemoryFS) Replace added in v0.6.0

func (m *MemoryFS) Replace(ctx context.Context, vector IOVector) error

func (*MemoryFS) StatFile added in v0.7.0

func (m *MemoryFS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*MemoryFS) Write

func (m *MemoryFS) Write(ctx context.Context, vector IOVector) error

type MinioSDK added in v1.0.0

type MinioSDK struct {
	// contains filtered or unexported fields
}

func NewMinioSDK added in v1.0.0

func NewMinioSDK(
	ctx context.Context,
	args ObjectStorageArguments,
	perfCounterSets []*perfcounter.CounterSet,
) (*MinioSDK, error)

func (*MinioSDK) Delete added in v1.0.0

func (a *MinioSDK) Delete(
	ctx context.Context,
	keys ...string,
) (
	err error,
)

func (*MinioSDK) Exists added in v1.0.0

func (a *MinioSDK) Exists(
	ctx context.Context,
	key string,
) (
	bool,
	error,
)

func (*MinioSDK) List added in v1.0.0

func (a *MinioSDK) List(
	ctx context.Context,
	prefix string,
	fn func(bool, string, int64) (bool, error),
) error

func (*MinioSDK) Read added in v1.0.0

func (a *MinioSDK) Read(
	ctx context.Context,
	key string,
	min *int64,
	max *int64,
) (
	r io.ReadCloser,
	err error,
)

func (*MinioSDK) Stat added in v1.0.0

func (a *MinioSDK) Stat(
	ctx context.Context,
	key string,
) (
	size int64,
	err error,
)

func (*MinioSDK) Write added in v1.0.0

func (a *MinioSDK) Write(
	ctx context.Context,
	key string,
	r io.Reader,
	size int64,
	expire *time.Time,
) (
	err error,
)

type MutableFileService

type MutableFileService interface {
	FileService

	// NewMutator creates a new mutator
	NewMutator(ctx context.Context, filePath string) (Mutator, error)
}

MutableFileService is an extension interface to FileService that allow mutation

type Mutator added in v0.6.0

type Mutator interface {

	// Mutate mutates file contents
	Mutate(ctx context.Context, entries ...IOEntry) error

	// Append appends data to file
	// all IOEntry.Offset is base on the end of file position
	// for example, passing IOEntry{Offset: 0, Len:1, Data: []byte("a")} will append "a" to the end of file
	Append(ctx context.Context, entries ...IOEntry) error

	// Close closes the mutator
	// Must be called after finishing mutation
	Close() error
}

type NewFileServicesFunc added in v0.6.0

type NewFileServicesFunc = func(defaultName string) (*FileServices, error)

NewFileServicesFunc creates a new *FileServices

type ObjectStorage added in v1.0.0

type ObjectStorage interface {
	// List lists objects with specified prefix
	List(
		ctx context.Context,
		prefix string,
		fn func(isPrefix bool, key string, size int64) (bool, error),
	) (
		err error,
	)

	// Stat returns informations about an object
	Stat(
		ctx context.Context,
		key string,
	) (
		size int64,
		err error,
	)

	// Exists reports whether specified object exists
	Exists(
		ctx context.Context,
		key string,
	) (
		bool,
		error,
	)

	// Write writes an object
	Write(
		ctx context.Context,
		key string,
		r io.Reader,
		size int64,
		expire *time.Time,
	) (
		err error,
	)

	// Read returns an io.Reader for specified object range
	Read(
		ctx context.Context,
		key string,
		min *int64,
		max *int64,
	) (
		r io.ReadCloser,
		err error,
	)

	// Delete deletes objects
	Delete(
		ctx context.Context,
		keys ...string,
	) (
		err error,
	)
}

type ObjectStorageArguments added in v1.0.0

type ObjectStorageArguments struct {
	// misc
	Name                 string `toml:"name"`
	KeyPrefix            string `toml:"key-prefix"`
	SharedConfigProfile  string `toml:"shared-config-profile"`
	NoDefaultCredentials bool   `toml:"no-default-credentials"`
	NoBucketValidation   bool   `toml:"no-bucket-validation"`

	// s3
	Bucket    string   `toml:"bucket"`
	Endpoint  string   `toml:"endpoint"`
	IsMinio   bool     `toml:"is-minio"`
	Region    string   `toml:"region"`
	CertFiles []string `toml:"cert-files"`

	// credentials
	RoleARN         string `json:"-" toml:"role-arn"`
	BearerToken     string `json:"-" toml:"bearer-token"`
	ExternalID      string `json:"-" toml:"external-id"`
	KeyID           string `json:"-" toml:"key-id"`
	KeySecret       string `json:"-" toml:"key-secret"`
	RAMRole         string `json:"-" toml:"ram-role"`
	RoleSessionName string `json:"-" toml:"role-session-name"`
	SecurityToken   string `json:"-" toml:"security-token"`
	SessionToken    string `json:"-" toml:"session-token"`
}

func (*ObjectStorageArguments) SetFromString added in v1.0.0

func (o *ObjectStorageArguments) SetFromString(arguments []string) error

func (ObjectStorageArguments) String added in v1.0.1

func (o ObjectStorageArguments) String() string

type OnDiskCacheEvictFunc added in v1.0.0

type OnDiskCacheEvictFunc = func(
	diskFilePath string,
)

type OnDiskCacheWrittenFunc added in v1.0.0

type OnDiskCacheWrittenFunc = func(
	filePath string,
	entry IOEntry,
)

type Path added in v0.6.0

type Path struct {
	Service          string
	ServiceArguments []string
	File             string
}

func ParsePath added in v0.6.0

func ParsePath(s string) (path Path, err error)

func ParsePathAtService added in v0.6.0

func ParsePathAtService(s string, serviceStr string) (path Path, err error)

func (Path) ServiceString added in v0.8.0

func (p Path) ServiceString() string

func (Path) String added in v0.8.0

func (p Path) String() string

type Policy added in v1.0.0

type Policy uint64

func (Policy) Any added in v1.0.0

func (c Policy) Any(policies ...Policy) bool

func (Policy) CacheFullFile added in v1.0.0

func (c Policy) CacheFullFile() bool

func (Policy) CacheIOEntry added in v1.0.0

func (c Policy) CacheIOEntry() bool

type Pool added in v0.8.0

type Pool[T any] struct {
	// contains filtered or unexported fields
}

func NewPool added in v0.8.0

func NewPool[T any](
	capacity uint32,
	newFunc func() T,
	resetFunc func(T),
	finallyFunc func(T),
) *Pool[T]

func (*Pool[T]) Get added in v0.8.0

func (p *Pool[T]) Get(ptr *T) PutBack[T]

func (*Pool[T]) Put added in v0.8.0

func (p *Pool[T]) Put(idx int, ptr *T)

type ProfileHandler added in v0.8.0

type ProfileHandler struct {
	// contains filtered or unexported fields
}

func NewProfileHandler added in v0.8.0

func NewProfileHandler() *ProfileHandler

func (*ProfileHandler) AddSample added in v0.8.0

func (p *ProfileHandler) AddSample(duration time.Duration, tags ...string)

func (*ProfileHandler) ServeHTTP added in v0.8.0

func (p *ProfileHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*ProfileHandler) StartProfile added in v0.8.0

func (p *ProfileHandler) StartProfile() (
	write func(w io.Writer),
	stop func(),
)

type ProfileInfo added in v1.0.0

type ProfileInfo struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewProfileInfo added in v1.0.0

func NewProfileInfo() *ProfileInfo

type ProfileSpan added in v1.0.0

type ProfileSpan struct {
	BeginTime       time.Time
	Locations       []*profile.Location
	SubSpanDuration time.Duration
}

type PutBack added in v0.8.0

type PutBack[T any] struct {
	// contains filtered or unexported fields
}

func (PutBack[T]) Put added in v0.8.0

func (pb PutBack[T]) Put()

type RCBytes added in v1.0.0

type RCBytes struct {
	*RCPoolItem[[]byte]
}

RCBytes represents a reference counting []byte from a pool newly created RCBytes' ref count is 1 owner should call Release to give it back to the pool new sharing owner should call Retain to increase ref count

func (RCBytes) Bytes added in v1.0.0

func (r RCBytes) Bytes() []byte

func (RCBytes) Copy added in v1.0.0

func (r RCBytes) Copy() []byte

func (RCBytes) Release added in v1.0.0

func (r RCBytes) Release()

func (RCBytes) Slice added in v1.0.0

func (r RCBytes) Slice(length int) CacheData

type RCPool added in v1.0.0

type RCPool[T any] struct {
	// contains filtered or unexported fields
}

RCPool represents a pool of reference counting objects

func NewRCPool added in v1.0.0

func NewRCPool[T any](
	newFunc func() T,
) *RCPool[T]

func (*RCPool[T]) Get added in v1.0.0

func (r *RCPool[T]) Get() *RCPoolItem[T]

type RCPoolItem added in v1.0.0

type RCPoolItem[T any] struct {
	Value T
	// contains filtered or unexported fields
}

func (*RCPoolItem[T]) Release added in v1.0.0

func (r *RCPoolItem[T]) Release()

func (*RCPoolItem[T]) Retain added in v1.0.0

func (r *RCPoolItem[T]) Retain()

type RemoteCache added in v1.0.0

type RemoteCache struct {
	// contains filtered or unexported fields
}

RemoteCache is the cache for remote read.

func NewRemoteCache added in v1.0.0

func NewRemoteCache(client client.CacheClient, factory KeyRouterFactory) *RemoteCache

func (*RemoteCache) DeletePaths added in v1.1.0

func (r *RemoteCache) DeletePaths(ctx context.Context, paths []string) error

func (*RemoteCache) Flush added in v1.0.0

func (r *RemoteCache) Flush()

func (*RemoteCache) Read added in v1.0.0

func (r *RemoteCache) Read(ctx context.Context, vector *IOVector) error

func (*RemoteCache) Update added in v1.0.0

func (r *RemoteCache) Update(ctx context.Context, vector *IOVector, async bool) error

type ReplaceableFileService added in v0.6.0

type ReplaceableFileService interface {
	FileService

	Replace(ctx context.Context, vector IOVector) error
}

ReplaceableFileService is an extension interface to FileService that allow replacing a whole file

type S3FS

type S3FS struct {
	// contains filtered or unexported fields
}

S3FS is a FileService implementation backed by S3

func NewS3FS

func NewS3FS(
	ctx context.Context,
	args ObjectStorageArguments,
	cacheConfig CacheConfig,
	perfCounterSets []*perfcounter.CounterSet,
	noCache bool,
	noDefaultCredential bool,
) (*S3FS, error)

func (*S3FS) Delete

func (s *S3FS) Delete(ctx context.Context, filePaths ...string) error

func (*S3FS) ETLCompatible added in v0.6.0

func (*S3FS) ETLCompatible()

func (*S3FS) FlushCache added in v0.6.0

func (s *S3FS) FlushCache()

func (*S3FS) List

func (s *S3FS) List(ctx context.Context, dirPath string) (entries []DirEntry, err error)

func (*S3FS) Name added in v0.6.0

func (s *S3FS) Name() string

func (*S3FS) PrefetchFile added in v1.0.1

func (s *S3FS) PrefetchFile(ctx context.Context, filePath string) error

func (*S3FS) Read

func (s *S3FS) Read(ctx context.Context, vector *IOVector) (err error)

func (*S3FS) ReadCache added in v1.0.0

func (s *S3FS) ReadCache(ctx context.Context, vector *IOVector) (err error)

func (*S3FS) SetAsyncUpdate added in v0.8.0

func (s *S3FS) SetAsyncUpdate(b bool)

func (*S3FS) StatFile added in v0.7.0

func (s *S3FS) StatFile(ctx context.Context, filePath string) (*DirEntry, error)

func (*S3FS) Write

func (s *S3FS) Write(ctx context.Context, vector IOVector) error

type SpanProfiler added in v1.0.0

type SpanProfiler struct {
	// contains filtered or unexported fields
}

SpanProfiler prpfiles a span in one goroutine multiple SpanProfilers may share one *profile.Profile

func NewSpanProfiler added in v1.0.0

func NewSpanProfiler() *SpanProfiler

NewSpanProfiler creates a new span profiler

func (*SpanProfiler) Begin added in v1.0.0

func (s *SpanProfiler) Begin(skip int) (profiler *SpanProfiler, end func())

Begin begins a new span If the calling goroutine does not match s.goID, a new profiler for the calling goroutine will be created The newly created profiler will share the same profile to s.profile

func (*SpanProfiler) Write added in v1.0.0

func (s *SpanProfiler) Write(w io.Writer) error

Write writes the packed profile to writer w

type TargetCacheKeys added in v1.0.0

type TargetCacheKeys map[string][]*pb.RequestCacheKey

Directories

Path Synopsis
checks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL