drh

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2021 License: Apache-2.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxRetries when failed used globally
	// No need an option of this.
	MaxRetries int = 5

	// DefaultMaxKeys is the maximum number of keys returned per listing request, default is 1000
	DefaultMaxKeys int32 = 1000

	// DefaultMultipartThreshold is the threshold size (in MB) to determine to use multipart upload or not.
	// When object size is greater or equals to MultipartThreshold, multipart upload will be used.
	DefaultMultipartThreshold int = 10

	// DefaultChunkSize is the chunk size (in MB) for each part when using multipart upload
	DefaultChunkSize int = 5

	// DefaultMaxParts the maximum number of parts is 10000 for multipart upload
	DefaultMaxParts int = 10000

	// DefaultMessageBatchSize the number of messages in a batch to send to SQS Queue
	DefaultMessageBatchSize int = 10

	// DefaultFinderDepth the depth of sub sub folders to compare in parallel. 0 means comparing all objects together with no parallelism.
	DefaultFinderDepth int = 0

	// DefaultFinderNumber is the number of finder threads to run in parallel
	DefaultFinderNumber int = 1

	// DefaultWorkerNumber is the number of worker threads to run in parallel
	DefaultWorkerNumber int = 4
)
View Source
const (
	// Ignore do nothing
	Ignore = iota

	// Delete is an action to delete an object
	Delete

	// Transfer is an action to transfer an object
	Transfer
)

Variables

View Source
var (
	// KB is 1024 Bytes
	KB int = 1 << 10

	// MB is 1024 KB
	MB int = 1 << 20
)

Functions

This section is empty.

Types

type Client

type Client interface {
	// READ
	HeadObject(ctx context.Context, key *string) *Metadata
	GetObject(ctx context.Context, key *string, size, start, chunkSize int64, version string) ([]byte, error)
	ListObjects(ctx context.Context, continuationToken, prefix *string, maxKeys int32) ([]*Object, error)
	ListCommonPrefixes(ctx context.Context, depth int, maxKeys int32) (prefixes []*string)
	ListParts(ctx context.Context, key, uploadID *string) (parts map[int]*Part)
	GetUploadID(ctx context.Context, key *string) (uploadID *string)

	// WRITE
	PutObject(ctx context.Context, key *string, body []byte, storageClass, acl *string, meta *Metadata) (etag *string, err error)
	CreateMultipartUpload(ctx context.Context, key, storageClass, acl *string, meta *Metadata) (uploadID *string, err error)
	CompleteMultipartUpload(ctx context.Context, key, uploadID *string, parts []*Part) (etag *string, err error)
	UploadPart(ctx context.Context, key *string, body []byte, uploadID *string, partNumber int) (etag *string, err error)
	AbortMultipartUpload(ctx context.Context, key, uploadID *string) (err error)
	DeleteObject(ctx context.Context, key *string) (err error)
}

Client is an interface used to contact with Cloud Storage Services

type DBService

type DBService struct {
	// contains filtered or unexported fields
}

DBService is a wrapper service used to interact with Amazon DynamoDB

func NewDBService

func NewDBService(ctx context.Context, tableName string) (*DBService, error)

NewDBService is a helper func to create a DBService instance

func (*DBService) PutItem added in v1.0.0

func (db *DBService) PutItem(ctx context.Context, o *Object) error

PutItem is a function to creates a new item, or replaces an old item with a new item in DynamoDB Restart a transfer of an object will replace the old item with new info

func (*DBService) QueryItem added in v1.0.0

func (db *DBService) QueryItem(ctx context.Context, key *string) (*Item, error)

QueryItem is a function to query an item by Key in DynamoDB

func (*DBService) UpdateItem added in v1.0.0

func (db *DBService) UpdateItem(ctx context.Context, key *string, result *TransferResult) error

UpdateItem is a function to update an item in DynamoDB

func (*DBService) UpdateSequencer added in v1.0.0

func (db *DBService) UpdateSequencer(ctx context.Context, key, sequencer *string) error

UpdateSequencer is a function to update an item with new Sequencer in DynamoDB

type Finder

type Finder struct {
	// contains filtered or unexported fields
}

Finder is an implemenation of Job interface Finder compares the differences of source and destination and sends the delta to SQS

func NewFinder

func NewFinder(ctx context.Context, cfg *JobConfig) (f *Finder)

NewFinder creates a new Finder instance

func (*Finder) Run

func (f *Finder) Run(ctx context.Context)

Run is main execution function for Finder.

type Item

type Item struct {
	ObjectKey                                     string
	JobStatus, Etag, Sequencer                    string
	Size, StartTimestamp, EndTimestamp, SpentTime int64
	StartTime, EndTime                            string
}

Item holds info about the items to be stored in DynamoDB

type Job

type Job interface {
	Run(ctx context.Context)
}

Job is an interface of a process to run by this tool A Job must have a Run() method

type JobConfig

type JobConfig struct {
	SrcType, SrcBucket, SrcPrefix, SrcRegion, SrcEndpoint, SrcCredential          string
	DestBucket, DestPrefix, DestRegion, DestCredential, DestStorageClass, DestAcl string
	JobTableName, JobQueueName                                                    string
	SrcInCurrentAccount, DestInCurrentAccount                                     bool
	*JobOptions
}

JobConfig is General Job Info

type JobOptions

type JobOptions struct {
	ChunkSize, MultipartThreshold, MessageBatchSize, FinderDepth, FinderNumber, WorkerNumber int
	MaxKeys                                                                                  int32
	IncludeMetadata                                                                          bool
}

JobOptions is General Job Info

type Metadata

type Metadata struct {
	// ContentType
	ContentType *string

	// ContentLanguage
	ContentLanguage *string

	// ContentEncoding
	ContentEncoding *string

	// CacheControl
	CacheControl *string

	// Custom metadata to store with the object in S3.
	// Map keys will be normalized to lower-case.
	Metadata map[string]string
}

Metadata info of object

type Object

type Object struct {
	Key       string `json:"key"`
	Size      int64  `json:"size"`
	Sequencer string `json:"sequencer,omitempty"`
}

Object represents an object to be replicated.

type Part

type Part struct {
	// contains filtered or unexported fields
}

Part represents a part for multipart upload

type S3Client

type S3Client struct {
	// contains filtered or unexported fields
}

S3Client is an implementation of Client interface for Amazon S3

func NewS3Client

func NewS3Client(ctx context.Context, bucket, prefix, endpoint, region, sourceType string, cred *S3Credentials) *S3Client

NewS3Client creates a S3Client instance

func (*S3Client) AbortMultipartUpload

func (c *S3Client) AbortMultipartUpload(ctx context.Context, key, uploadID *string) (err error)

AbortMultipartUpload is to abort failed multipart upload

func (*S3Client) CompleteMultipartUpload

func (c *S3Client) CompleteMultipartUpload(ctx context.Context, key, uploadID *string, parts []*Part) (etag *string, err error)

CompleteMultipartUpload is a function to combine all parts uploaded and create the full object.

func (*S3Client) CreateMultipartUpload

func (c *S3Client) CreateMultipartUpload(ctx context.Context, key, storageClass, acl *string, meta *Metadata) (uploadID *string, err error)

CreateMultipartUpload is a function to initilize a multipart upload process. This func returns an upload ID used to indicate the multipart upload. All parts will be uploaded with this upload ID, after that, all parts by this ID will be combined to create the full object.

func (*S3Client) DeleteObject added in v1.0.0

func (c *S3Client) DeleteObject(ctx context.Context, key *string) (err error)

DeleteObject is to abort failed multipart upload

func (*S3Client) GetObject

func (c *S3Client) GetObject(ctx context.Context, key *string, size, start, chunkSize int64, version string) ([]byte, error)

GetObject is a function to get (download) object from Amazon S3

func (*S3Client) GetUploadID added in v1.0.0

func (c *S3Client) GetUploadID(ctx context.Context, key *string) (uploadID *string)

GetUploadID use ListMultipartUploads to get the last unfinished upload ID by key

func (*S3Client) HeadObject

func (c *S3Client) HeadObject(ctx context.Context, key *string) *Metadata

HeadObject is a function to get extra metadata from Amazon S3

func (*S3Client) ListCommonPrefixes

func (c *S3Client) ListCommonPrefixes(ctx context.Context, depth int, maxKeys int32) (prefixes []*string)

ListCommonPrefixes is a function to list common prefixes.

func (*S3Client) ListObjects

func (c *S3Client) ListObjects(ctx context.Context, continuationToken, prefix *string, maxKeys int32) ([]*Object, error)

ListObjects is a function to list objects from Amazon S3

func (*S3Client) ListParts

func (c *S3Client) ListParts(ctx context.Context, key, uploadID *string) (parts map[int]*Part)

ListParts returns list of parts by upload ID in a map

func (*S3Client) PutObject

func (c *S3Client) PutObject(ctx context.Context, key *string, body []byte, storageClass, acl *string, meta *Metadata) (etag *string, err error)

PutObject is a function to put (upload) an object to Amazon S3

func (*S3Client) UploadPart

func (c *S3Client) UploadPart(ctx context.Context, key *string, body []byte, uploadID *string, partNumber int) (etag *string, err error)

UploadPart is

type S3Credentials

type S3Credentials struct {
	// contains filtered or unexported fields
}

S3Credentials is

type S3Event added in v1.0.0

type S3Event struct {
	Records []struct {
		EventSource, AwsRegion, EventTime, EventName string
		S3                                           struct {
			Object `json:"object"`
		}
	}
}

S3Event represents a basic structure of a S3 Event Message See https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html for more details

type SqsService

type SqsService struct {
	// contains filtered or unexported fields
}

SqsService is a wrapper service used to interact with Amazon SQS

func NewSqsService

func NewSqsService(ctx context.Context, queueName string) (*SqsService, error)

NewSqsService is a helper func to create a SqsService instance

func (*SqsService) ChangeVisibilityTimeout

func (ss *SqsService) ChangeVisibilityTimeout(ctx context.Context, rh *string, seconds int32) (ok bool)

ChangeVisibilityTimeout function is used to change the Visibility Timeout of a message

func (*SqsService) DeleteMessage

func (ss *SqsService) DeleteMessage(ctx context.Context, rh *string) (ok bool)

DeleteMessage function is used to delete message from the Queue Returns True if message is deleted successfully

func (*SqsService) IsQueueEmpty

func (ss *SqsService) IsQueueEmpty(ctx context.Context) (isEmpty bool)

IsQueueEmpty is a function to check if the Queue is empty or not

func (*SqsService) ReceiveMessages

func (ss *SqsService) ReceiveMessages(ctx context.Context) (body, receiptHandle *string)

ReceiveMessages function receives many messages in batch from the Queue Currently, only 1 message is returned, MaxNumberOfMessages is defaulted to 1

func (*SqsService) SendMessage

func (ss *SqsService) SendMessage(ctx context.Context, body *string)

SendMessage function sends 1 message at a time to the Queue

func (*SqsService) SendMessageInBatch

func (ss *SqsService) SendMessageInBatch(ctx context.Context, batch []*string)

SendMessageInBatch function sends messages to the Queue in batch. Each batch can only contains up to 10 messages

type SsmService

type SsmService struct {
	// contains filtered or unexported fields
}

SsmService is a wrapper service used to interact with Amazon SSM

func NewSsmService

func NewSsmService(ctx context.Context) (*SsmService, error)

NewSsmService is a helper func to create a SsmService instance

func (*SsmService) GetParameterValue

func (s *SsmService) GetParameterValue(ctx context.Context, param *string, withDecryption bool) *string

GetParameterValue is a function to read the value of a parameter in System Manager Parameter Store

type TransferResult

type TransferResult struct {
	// contains filtered or unexported fields
}

TransferResult stores the result after transfer.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is an implemenation of Job interface Worker is used to consume the messages from SQS and start the transferring

func NewWorker

func NewWorker(ctx context.Context, cfg *JobConfig) (w *Worker)

NewWorker creates a new Worker instance

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run a Worker job

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL