storage

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 12, 2020 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Copyright 2019 The OpenSDS Authors.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2019 The OpenSDS Authors.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	MON_TIMEOUT         = "10"
	OSD_TIMEOUT         = "10"
	STRIPE_UNIT         = 512 << 10 /* 512K */
	STRIPE_COUNT        = 2
	OBJECT_SIZE         = 8 << 20         /* 8M */
	BUFFER_SIZE         = 1 << 20         /* 1M */
	MIN_CHUNK_SIZE      = 512 << 10       /* 512K */
	MAX_CHUNK_SIZE      = 8 * BUFFER_SIZE /* 8M */
	SMALL_FILE_POOLNAME = "rabbit"
	BIG_FILE_POOLNAME   = "tiger"
	BIG_FILE_THRESHOLD  = 128 << 10 /* 128K */
	AIO_CONCURRENT      = 4
)
View Source
const (
	GC_OBJECT_LIMIT_NUM = 10000
	// max interval time of gc in seconds.
	GC_MAX_INTERVAL_TIME   = 3600
	CEPH_OBJ_NON_EXIST_ERR = "rados: ret=-2"
)
View Source
const (
	MAX_PART_SIZE   = 5 << 30 // 5GB
	MIN_PART_SIZE   = 5 << 20 // 5MB
	MIN_PART_NUMBER = 1
	MAX_PART_NUMBER = 10000
)
View Source
const (
	AES_BLOCK_SIZE               = 16
	ENCRYPTION_KEY_LENGTH        = 32 // key size for AES-"256"
	INITIALIZATION_VECTOR_LENGTH = 16 // block size of AES
	DEFAULT_CEPHCONFIG_PATTERN   = "conf/*.conf"
)
View Source
const CLUSTER_MAX_USED_SPACE_PERCENT = 85

Variables

View Source
var (
	RootContext = context.Background()
)

Functions

This section is empty.

Types

type CephStorage

type CephStorage struct {
	Name       string
	Conn       *rados.Conn
	InstanceId uint64
	CountMutex *sync.Mutex
	Counter    uint64
	BufPool    *sync.Pool
	BigBufPool *sync.Pool
}

func NewCephStorage

func NewCephStorage(configFile string) *CephStorage

func (*CephStorage) Append

func (cluster *CephStorage) Append(poolname string, oid string, data io.Reader, offset uint64, isExist bool) (size int64, err error)

func (*CephStorage) GetUniqUploadName

func (cluster *CephStorage) GetUniqUploadName() string

func (*CephStorage) GetUsedSpacePercent

func (cluster *CephStorage) GetUsedSpacePercent() (pct int, err error)

func (*CephStorage) Put

func (cluster *CephStorage) Put(poolname string, oid string, data io.Reader) (size int64, err error)

func (*CephStorage) Remove

func (cluster *CephStorage) Remove(poolname string, oid string) error

func (*CephStorage) Shutdown

func (c *CephStorage) Shutdown()

type GcMgr

type GcMgr struct {
	// contains filtered or unexported fields
}

func NewGcMgr

func NewGcMgr(ctx context.Context, yig *YigStorage, loopTime int64) *GcMgr

func (*GcMgr) CreateGcObjectRecordCleanStream

func (gm *GcMgr) CreateGcObjectRecordCleanStream(in ...<-chan *GcObjectResult) <-chan *GcObjectResult

func (*GcMgr) CreateObjectDeleteStream

func (gm *GcMgr) CreateObjectDeleteStream(in <-chan *types.GcObject) <-chan *GcObjectResult

func (*GcMgr) QueryGcObjectStream

func (gm *GcMgr) QueryGcObjectStream() <-chan *types.GcObject

func (*GcMgr) Start

func (gm *GcMgr) Start()

func (*GcMgr) Stop

func (gm *GcMgr) Stop()

type GcObjectResult

type GcObjectResult struct {
	ErrNo    S3ErrorCode
	Err      error
	Id       int64
	ObjectId string
}

type MultipartReader

type MultipartReader struct {
	// contains filtered or unexported fields
}

func NewMultipartReader

func NewMultipartReader(yig *YigStorage, uploadIdStr string, start int64, end int64) (*MultipartReader, error)

func (*MultipartReader) Close

func (mr *MultipartReader) Close() error

func (*MultipartReader) Read

func (mr *MultipartReader) Read(p []byte) (int, error)

type ObjectMetaInfo

type ObjectMetaInfo struct {
	Cluster string
	Pool    string
}

func ParseObjectMeta

func ParseObjectMeta(meta string) (ObjectMetaInfo, error)

type RadosDownloader

type RadosDownloader struct {
	// contains filtered or unexported fields
}

func (*RadosDownloader) Close

func (rd *RadosDownloader) Close() error

func (*RadosDownloader) Read

func (rd *RadosDownloader) Read(p []byte) (n int, err error)

func (*RadosDownloader) Seek

func (rd *RadosDownloader) Seek(offset int64, whence int) (int64, error)

type RadosSmallDownloader

type RadosSmallDownloader struct {
	// contains filtered or unexported fields
}

func (*RadosSmallDownloader) Close

func (rd *RadosSmallDownloader) Close() error

func (*RadosSmallDownloader) Read

func (rd *RadosSmallDownloader) Read(p []byte) (n int, err error)

func (*RadosSmallDownloader) Seek

func (rd *RadosSmallDownloader) Seek(offset int64, whence int) (int64, error)

type YigStorage

type YigStorage struct {
	DataStorage map[string]*CephStorage
	MetaStorage *meta.Meta
	KMS         crypto.KMS
	Stopping    bool
	// contains filtered or unexported fields
}

YigStorage implements StorageDriver

func New

func New(cfg *config.Config) (*YigStorage, error)

func (*YigStorage) AbortMultipartUpload

func (yig *YigStorage) AbortMultipartUpload(ctx context.Context, multipartUpload *pb.MultipartUpload) error

func (*YigStorage) ChangeStorageClass

func (yig *YigStorage) ChangeStorageClass(ctx context.Context, object *pb.Object, newClass *string) error

func (*YigStorage) Close

func (y *YigStorage) Close() error

func (*YigStorage) CompleteMultipartUpload

func (yig *YigStorage) CompleteMultipartUpload(ctx context.Context, multipartUpload *pb.MultipartUpload,
	completeUpload *model.CompleteMultipartUpload) (*model.CompleteMultipartUploadResult, error)

func (*YigStorage) Copy

func (yig *YigStorage) Copy(ctx context.Context, stream io.Reader, target *pb.Object) (result dscommon.PutResult, err error)

func (*YigStorage) Delete

func (yig *YigStorage) Delete(ctx context.Context, object *pb.DeleteObjectInput) error

* @objectId: input object id which will be deleted. * Below is the process logic: * 1. check whether objectId is multipart uploaded, if so * retrieve all the object ids from multiparts and put them into gc. * or else, put it into gc. *

func (*YigStorage) Get

func (yig *YigStorage) Get(ctx context.Context, object *pb.Object, start int64, end int64) (io.ReadCloser, error)

func (*YigStorage) GetClusterByFsName

func (yig *YigStorage) GetClusterByFsName(fsName string) (cluster *CephStorage, err error)

func (*YigStorage) InitMultipartUpload

func (yig *YigStorage) InitMultipartUpload(ctx context.Context, object *pb.Object) (*pb.MultipartUpload, error)

func (*YigStorage) ListParts

func (yig *YigStorage) ListParts(ctx context.Context, multipartUpload *pb.ListParts) (*model.ListPartsOutput, error)

func (*YigStorage) PickOneClusterAndPool

func (yig *YigStorage) PickOneClusterAndPool(bucket string, object string, size int64, isAppend bool) (cluster *CephStorage,
	poolName string)

func (*YigStorage) Put

func (yig *YigStorage) Put(ctx context.Context, stream io.Reader, obj *pb.Object) (result dscommon.PutResult,
	err error)

ctx should contain below elements: * size: object size. * encryptionKey: * md5: the md5 put by user for the uploading object.

func (*YigStorage) UploadPart

func (yig *YigStorage) UploadPart(ctx context.Context, stream io.Reader, multipartUpload *pb.MultipartUpload,
	partNumber int64, upBytes int64) (*model.UploadPartResult, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL