s3

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Download

func Download(ctx context.Context, storage *S3, src string, dest string) (err error)
func (s *S3) Validate() error {
	s.Ak = strings.Trim(s.Ak, " ")
	s.Sk = strings.Trim(s.Sk, " ")
	s.Bucket = strings.Trim(s.Bucket, " /")
	s.Subfolder = strings.Trim(s.Subfolder, " /")
	s.Endpoint = strings.Trim(s.Endpoint, " /")

	if s.Ak == "" || s.Sk == "" || s.Bucket == "" || s.Endpoint == "" {
		return errors.New("required field is missing")
	}

	return nil
}

// Validate the existence of bucket

func Validate(s *S3) (err error) {
	var minioClient *minio.Client
	if minioClient, err = getMinioClient(s); err != nil {
		return
	}

	var exists bool

	if exists, err = minioClient.BucketExists(s.Bucket); err != nil {
		return
	} else if !exists {
		err = fmt.Errorf("no bucket named %s", s.Bucket)
		return
	}

	return
}

Download the file to object storage

func ListFiles

func ListFiles(storage *S3, prefix string, recursive bool) (files []string, err error)

ListFiles with specific prefix

func Upload

func Upload(ctx context.Context, storage *S3, src string, dest string) (err error)

// RemoveFiles will attempt to remove files specified in prefixList in `target` s3 endpoint-bucket recursively. // If dryRun is given, the removal will be no op other than a log. // Note that this API makes its best attempt to remove given files, if the removal fails, it will log error but the // function just continues.

func RemoveFiles(target *S3, prefixList []string, dryRun bool) {
	log := log.SugaredLogger()
	minioClient, err := getMinioClient(target)
	if err != nil {
		log.Errorf("Fail to create minioClient for storage: %v; err: %v", target, err)
		return
	}

	objectsCh := make(chan string)
	var wg sync.WaitGroup
	var cnt int64
	for _, prefix := range prefixList {
		wg.Add(1)
		go func(filePrefix string) {
			defer wg.Done()
			objects, err := ListFiles(target, filePrefix, true /* recursive */)
			if err != nil {
				// Failing to remove some S3 files is not fatal, log and move on to try next batch
				log.Errorf("Error detected while listing files to be removed for [%v-%v-%v] with error: %v",
					target.Endpoint, target.Bucket, filePrefix, err)
				return
			}
			atomic.AddInt64(&cnt, int64(len(objects)))
			for _, o := range objects {
				o = filepath.Join(target.GetObjectPath(""), o)
				if !dryRun {
					objectsCh <- o
				} else {
					log.Infof("%v-%v removing file [dryRun %v]: %v",
						target.Endpoint, target.Bucket, dryRun, o)
				}
			}
		}(prefix)
	}
	errCh := minioClient.RemoveObjects(target.Bucket, objectsCh)
	if waitTimeout(&wg, 15*time.Minute) {
		log.Errorf("%v-%v removing files timeout waiting for all groups [dryRun %v]", target.Endpoint, target.Bucket, dryRun)
	}
	close(objectsCh)
	log.Infof("%v-%v removing %d files [dryRun %v]", target.Endpoint, target.Bucket, cnt, dryRun)
	for err := range errCh {
		log.Errorf("%v-%v removing file failed with error: %v", target.Endpoint, target.Bucket, err)
	}
}

Upload the file to object storage

Types

type S3

type S3 struct {
	*Storage
}

func NewS3StorageFromEncryptedURI

func NewS3StorageFromEncryptedURI(encryptedURI string) (*S3, error)

func NewS3StorageFromURL

func NewS3StorageFromURL(uri string) (*S3, error)

func (*S3) GetEncryptedURL

func (s *S3) GetEncryptedURL() (encrypted string, err error)

func (*S3) GetObjectPath

func (s *S3) GetObjectPath(name string) string

func (*S3) GetSchema

func (s *S3) GetSchema() string

func (*S3) GetURI

func (s *S3) GetURI() string

func (*S3) GetURL

func (s *S3) GetURL() string

type Storage

type Storage struct {
	//ID          primitive.ObjectID `bson:"_id"         json:"id"`
	Ak          string `bson:"ak"          json:"ak"`
	Sk          string `bson:"-"           json:"sk"`
	Endpoint    string `bson:"endpoint"    json:"endpoint"`
	Bucket      string `bson:"bucket"      json:"bucket"`
	Subfolder   string `bson:"subfolder"   json:"subfolder"`
	Insecure    bool   `bson:"insecure"    json:"insecure"`
	IsDefault   bool   `bson:"is_default"  json:"is_default"`
	EncryptedSk string `bson:"encryptedSk" json:"-"`
	UpdatedBy   string `bson:"updated_by"  json:"updated_by"`
	UpdateTime  int64  `bson:"update_time" json:"update_time"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL