ste

package
v10.0.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2018 License: MIT Imports: 31 Imported by: 13

Documentation

Index

Constants

View Source
const (
	ContentTypeMaxBytes     = 256  // If > 65536, then jobPartPlanBlobData's ContentTypeLength's type  field must change
	ContentEncodingMaxBytes = 256  // If > 65536, then jobPartPlanBlobData's ContentEncodingLength's type  field must change
	MetadataMaxBytes        = 1000 // If > 65536, then jobPartPlanBlobData's MetadataLength field's type must change
	BlobTierMaxBytes        = 10
)
View Source
const DataSchemaVersion common.Version = 0

dataSchemaVersion defines the data schema version of JobPart order files supported by current version of azcopy To be Incremented every time when we release azcopy with changed dataSchema

View Source
const DefaultServiceApiVersion = "2018-03-28"

DefaultServiceApiVersion is the default value of service api version that is set as value to the ServiceAPIVersionOverride in every Job's context.

View Source
const DownloadMaxRetryDelay = time.Second * 60
View Source
const DownloadRetryDelay = time.Second * 1
View Source
const DownloadTryTimeout = time.Minute * 15
View Source
const EMPTY_SAS_STRING = ""
View Source
const MaxRetryPerDownloadBody = 5

download related

View Source
const PacerTimeToWaitInMs = 50

pacer related

View Source
const UploadMaxRetryDelay = time.Second * 60
View Source
const UploadMaxTries = 20

upload related

View Source
const UploadRetryDelay = time.Second * 1
View Source
const UploadTryTimeout = time.Minute * 15

Variables

View Source
var JobsAdmin interface {
	NewJobPartPlanFileName(jobID common.JobID, partNumber common.PartNumber) JobPartPlanFileName

	// JobIDDetails returns point-in-time list of JobIDDetails
	JobIDs() []common.JobID

	// JobMgr returns the specified JobID's JobMgr
	JobMgr(jobID common.JobID) (IJobMgr, bool)
	JobMgrEnsureExists(jobID common.JobID, level common.LogLevel, commandString string) IJobMgr

	// AddJobPartMgr associates the specified JobPartMgr with the Jobs Administrator
	//AddJobPartMgr(appContext context.Context, planFile JobPartPlanFileName) IJobPartMgr
	/*ScheduleTransfer(jptm IJobPartTransferMgr)*/
	ScheduleChunk(priority common.JobPriority, chunkFunc chunkFunc)

	ResurrectJob(jobId common.JobID, sourceSAS string, destinationSAS string) bool

	ResurrectJobParts()

	QueueJobParts(jpm IJobPartMgr)

	// AppPathFolder returns the Azcopy application path folder.
	// JobPartPlanFile will be created inside this folder.
	AppPathFolder() string

	// returns the current value of bytesOverWire.
	BytesOverWire() int64

	//DeleteJob(jobID common.JobID)
	common.ILoggerCloser
}

JobAdmin is the singleton that manages ALL running Jobs, their parts, & their transfers

View Source
var ServiceAPIVersionOverride = serviceAPIVersionOverride{}

ServiceAPIVersionOverride is a global variable in package ste which is a key to Service Api Version Value set in the every Job's context.

Functions

func BlobFSToLocal

func BlobFSToLocal(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

func BlobToLocal

func BlobToLocal(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

func CancelPauseJobOrder

func CancelPauseJobOrder(jobID common.JobID, desiredJobStatus common.JobStatus) common.CancelPauseResumeResponse

cancelpauseJobOrder api cancel/pause a job with given JobId

 A Job cannot be cancelled/paused in following cases
	* If the Job has not been ordered completely it cannot be cancelled or paused
    * If all the transfers in the Job are either failed or completed, then Job cannot be cancelled or paused
    * If a job is already paused, it cannot be paused again

func DeleteBlobPrologue

func DeleteBlobPrologue(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

func DeleteFilePrologue

func DeleteFilePrologue(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

func ExecuteNewCopyJobPartOrder

func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.CopyJobPartOrderResponse

ExecuteNewCopyJobPartOrder api executes a new job part order

func FileToLocal

func FileToLocal(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

TODO: Unify blobToLocal and fileToLocal after code review and logic is finalized.

func GetJobFromTo

GetJobFromTo api returns the job FromTo info.

func GetJobSummary

func GetJobSummary(jobID common.JobID) common.ListJobSummaryResponse

GetJobSummary api returns the job progress summary of an active job

* Return following Properties in Job Progress Summary * CompleteJobOrdered - determines whether final part of job has been ordered or not * TotalTransfers - total number of transfers available for the given job * TotalNumberOfTransfersCompleted - total number of transfers in the job completed * NumberOfTransfersCompletedAfterCheckpoint - number of transfers completed after the last checkpoint * NumberOfTransferFailedAfterCheckpoint - number of transfers failed after last checkpoint timestamp * PercentageProgress - job progress reported in terms of percentage * FailedTransfers - list of transfer after last checkpoint timestamp that failed.

func ListJobTransfers

ListJobTransfers api returns the list of transfer with specific status for given jobId in http response

func ListJobs

func ListJobs() common.ListJobsResponse

ListJobs returns the jobId of all the jobs existing in the current instance of azcopy

func LocalToBlobFS

func LocalToBlobFS(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

func LocalToBlockBlob

func LocalToBlockBlob(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

func LocalToFile

func LocalToFile(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

func MainSTE

func MainSTE(concurrentConnections int, targetRateInMBps int64, azcopyAppPathFolder string) error

MainSTE initializes the Storage Transfer Engine

func NewBFSXferRetryPolicyFactory

func NewBFSXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory

TODO fix the separate retry policies NewBFSXferRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.

func NewBlobFSPipeline

func NewBlobFSPipeline(c azbfs.Credential, o azbfs.PipelineOptions, r XferRetryOptions, p *pacer) pipeline.Pipeline

NewBlobFSPipeline creates a pipeline for transfers to and from BlobFS Service The blobFS operations currently in azcopy are supported by SharedKey Credentials

func NewBlobPipeline

func NewBlobPipeline(c azblob.Credential, o azblob.PipelineOptions, r XferRetryOptions, p *pacer) pipeline.Pipeline

NewBlobPipeline creates a Pipeline using the specified credentials and options.

func NewBlobXferRetryPolicyFactory

func NewBlobXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory

TODO: Fix the separate retry policies, use Azure blob's retry policy after blob SDK with retry optimization get released. NewBlobXferRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.

func NewPacerPolicyFactory

func NewPacerPolicyFactory(p *pacer) pipeline.Factory

NewPacerPolicyFactory creates a factory that can create telemetry policy objects which add telemetry information to outgoing HTTP requests.

func NewVersionPolicyFactory

func NewVersionPolicyFactory() pipeline.Factory

NewVersionPolicy creates a factory that can override the service version set in the request header. If the context has key overwrite-current-version set to false, then x-ms-version in request is not overwritten else it will set x-ms-version to 207-04-17

func PutBlobUploadFunc

func PutBlobUploadFunc(jptm IJobPartTransferMgr, srcMmf *common.MMF, blockBlobUrl azblob.BlockBlobURL, pacer *pacer)

func ToFixed

func ToFixed(num float64, precision int) float64

ToFixed api returns the float number precised upto given decimal places.

func URLToBlob

func URLToBlob(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer *pacer)

Types

type BlobFSFileDownload

type BlobFSFileDownload struct {
	// contains filtered or unexported fields
}

type CoordinatorChannels

type CoordinatorChannels struct {
	// contains filtered or unexported fields
}

type ErrorEx

type ErrorEx struct {
	// contains filtered or unexported fields
}

func (ErrorEx) ErrorCodeAndString

func (errex ErrorEx) ErrorCodeAndString() (int, string)

type IJobMgr

type IJobMgr interface {
	JobID() common.JobID
	JobPartMgr(partNum PartNumber) (IJobPartMgr, bool)
	//Throughput() XferThroughput
	AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, sourceSAS string,
		destinationSAS string, scheduleTransfers bool) IJobPartMgr
	SetIncludeExclude(map[string]int, map[string]int)
	IncludeExclude() (map[string]int, map[string]int)
	ResumeTransfers(appCtx context.Context)
	AllTransfersScheduled() bool
	ConfirmAllTransfersScheduled()
	ResetAllTransfersScheduled()
	PipelineLogInfo() pipeline.LogOptions
	ReportJobPartDone() uint32
	Context() context.Context
	Cancel()
	// TODO: added for debugging purpose. remove later
	OccupyAConnection()
	// TODO: added for debugging purpose. remove later
	ReleaseAConnection()
	// TODO: added for debugging purpose. remove later
	ActiveConnections() int64

	common.ILoggerCloser
	// contains filtered or unexported methods
}

type IJobPartMgr

type IJobPartMgr interface {
	Plan() *JobPartPlanHeader
	ScheduleTransfers(jobCtx context.Context)
	StartJobXfer(jptm IJobPartTransferMgr)
	ReportTransferDone() uint32
	IsForceWriteTrue() bool
	ScheduleChunks(chunkFunc chunkFunc)
	RescheduleTransfer(jptm IJobPartTransferMgr)
	BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier)
	SAS() (string, string)
	//CancelJob()
	Close()
	// TODO: added for debugging purpose. remove later
	OccupyAConnection()
	// TODO: added for debugging purpose. remove later
	ReleaseAConnection()
	common.ILogger
}

type IJobPartTransferMgr

type IJobPartTransferMgr interface {
	FromTo() common.FromTo
	Info() TransferInfo
	BlobDstData(dataFileToXfer *common.MMF) (headers azblob.BlobHTTPHeaders, metadata azblob.Metadata)
	FileDstData(dataFileToXfer *common.MMF) (headers azfile.FileHTTPHeaders, metadata azfile.Metadata)
	PreserveLastModifiedTime() (time.Time, bool)
	BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier)
	//ScheduleChunk(chunkFunc chunkFunc)
	Context() context.Context
	StartJobXfer()
	IsForceWriteTrue() bool
	ReportChunkDone() (lastChunk bool, chunksDone uint32)
	TransferStatus() common.TransferStatus
	SetStatus(status common.TransferStatus)
	SetErrorCode(errorCode int32)
	SetNumberOfChunks(numChunks uint32)
	ReportTransferDone() uint32
	RescheduleTransfer()
	ScheduleChunks(chunkFunc chunkFunc)
	Cancel()
	WasCanceled() bool
	// TODO: added for debugging purpose. remove later
	OccupyAConnection()
	// TODO: added for debugging purpose. remove later
	ReleaseAConnection()
	LogUploadError(source, destination, errorMsg string, status int)
	LogDownloadError(source, destination, errorMsg string, status int)
	LogS2SCopyError(source, destination, errorMsg string, status int)
	LogError(resource, context string, err error)
	LogTransferStart(source, destination, description string)
	common.ILogger
}

type InMemoryTransitJobState

type InMemoryTransitJobState struct {
	// contains filtered or unexported fields
}

InMemoryTransitJobState defines job state transit in memory, and not in JobPartPlan file. Note: InMemoryTransitJobState should only be set when request come from cmd(FE) module to STE module. In memory CredentialInfo is currently maintained per job in STE, as FE could have many-to-one relationship with STE, i.e. different jobs could have different OAuth tokens requested from FE, and these jobs can run at same time in STE. This can be optimized if FE would no more be another module vs STE module.

type JobPartPlanDstBlob

type JobPartPlanDstBlob struct {

	// represents user decision to interpret the content-encoding from source file
	NoGuessMimeType bool

	// Specifies the length of MIME content type of the blob
	ContentTypeLength uint16

	// Specifies the MIME content type of the blob. The default type is application/octet-stream
	ContentType [ContentTypeMaxBytes]byte

	// Specifies length of content encoding which have been applied to the blob.
	ContentEncodingLength uint16

	// Specifies which content encodings have been applied to the blob.
	ContentEncoding [ContentEncodingMaxBytes]byte

	// Specifies the tier if this is a block or page blob
	BlockBlobTier common.BlockBlobTier
	PageBlobTier  common.PageBlobTier

	MetadataLength uint16
	Metadata       [MetadataMaxBytes]byte

	// Specifies the maximum size of block which determines the number of chunks and chunk size of a transfer
	BlockSize uint32
}

JobPartPlanDstBlob holds additional settings required when the destination is a blob

type JobPartPlanDstLocal

type JobPartPlanDstLocal struct {

	// Specifies whether the timestamp of destination file has to be set to the modified time of source file
	PreserveLastModifiedTime bool
}

jobPartPlanDstLocal holds additional settings required when the destination is a local file

type JobPartPlanFileName

type JobPartPlanFileName string

func (JobPartPlanFileName) Create

createJobPartPlanFile creates the memory map JobPartPlanHeader using the given JobPartOrder and JobPartPlanBlobData

func (JobPartPlanFileName) Delete

func (jpfn JobPartPlanFileName) Delete() error

func (*JobPartPlanFileName) GetJobPartPlanPath

func (jppfn *JobPartPlanFileName) GetJobPartPlanPath() string

func (JobPartPlanFileName) Map

func (jpfn JobPartPlanFileName) Map() *JobPartPlanMMF

func (JobPartPlanFileName) Parse

func (jpfn JobPartPlanFileName) Parse() (jobID common.JobID, partNumber common.PartNumber, err error)

TODO: This needs testing

type JobPartPlanHeader

type JobPartPlanHeader struct {
	// Once set, the following fields are constants; they should never be modified
	Version             common.Version     // The version of data schema format of header; see the dataSchemaVersion constant
	JobID               common.JobID       // Job Part's JobID
	PartNum             common.PartNumber  // Job Part's part number (0+)
	IsFinalPart         bool               // True if this is the Job's last part; else false
	ForceWrite          bool               // True if the existing blobs needs to be overwritten.
	Priority            common.JobPriority // The Job Part's priority
	TTLAfterCompletion  uint32             // Time to live after completion is used to persists the file on disk of specified time after the completion of JobPartOrder
	FromTo              common.FromTo      // The location of the transfer's source & destination
	CommandStringLength uint32
	NumTransfers        uint32              // The number of transfers in the Job part
	LogLevel            common.LogLevel     // This Job Part's minimal log level
	DstBlobData         JobPartPlanDstBlob  // Additional data for blob destinations
	DstLocalData        JobPartPlanDstLocal // Additional data for local destinations
	// contains filtered or unexported fields
}

JobPartPlanHeader represents the header of Job Part's memory-mapped file

func (*JobPartPlanHeader) CommandString

func (jpph *JobPartPlanHeader) CommandString() string

CommandString returns the command string given by user when job was created

func (*JobPartPlanHeader) JobStatus

func (jpph *JobPartPlanHeader) JobStatus() common.JobStatus

Status returns the job status stored in JobPartPlanHeader in thread-safe manner

func (*JobPartPlanHeader) SetJobStatus

func (jpph *JobPartPlanHeader) SetJobStatus(newJobStatus common.JobStatus)

SetJobStatus sets the job status in JobPartPlanHeader in thread-safe manner

func (*JobPartPlanHeader) Transfer

func (jpph *JobPartPlanHeader) Transfer(transferIndex uint32) *JobPartPlanTransfer

Transfer api gives memory map JobPartPlanTransfer header for given index

func (*JobPartPlanHeader) TransferSrcDstStrings

func (jpph *JobPartPlanHeader) TransferSrcDstStrings(transferIndex uint32) (source, destination string)

TransferSrcDstDetail returns the source and destination string for a transfer at given transferIndex in JobPartOrder

func (*JobPartPlanHeader) TransferSrcPropertiesAndMetadata

func (jpph *JobPartPlanHeader) TransferSrcPropertiesAndMetadata(transferIndex uint32) (h azblob.BlobHTTPHeaders, metadata common.Metadata, blobType azblob.BlobType)

TransferSrcPropertiesAndMetadata returns the SrcHTTPHeaders, properties and metadata for a transfer at given transferIndex in JobPartOrder TODO: Refactor return type to an object

type JobPartPlanMMF

type JobPartPlanMMF common.MMF

func (*JobPartPlanMMF) Plan

func (mmf *JobPartPlanMMF) Plan() *JobPartPlanHeader

func (*JobPartPlanMMF) Unmap

func (mmf *JobPartPlanMMF) Unmap()

type JobPartPlanTransfer

type JobPartPlanTransfer struct {

	// SrcOffset represents the actual start offset transfer header written in JobPartOrder file
	SrcOffset int64
	// SrcLength represents the actual length of source string for specific transfer
	SrcLength int16
	// DstLength represents the actual length of destination string for specific transfer
	DstLength int16
	// ChunkCount represents the num of chunks a transfer is split into
	//ChunkCount uint16	// TODO: Remove this, we need to determine it at runtime
	// ModifiedTime represents the last time at which source was modified before start of transfer stored as nanoseconds.
	ModifiedTime int64
	// SourceSize represents the actual size of the source on disk
	SourceSize int64
	// CompletionTime represents the time at which transfer was completed
	CompletionTime uint64

	// For S2S copy, per Transfer source's properties
	// TODO: ensure the length is enough
	SrcContentTypeLength        int16
	SrcContentEncodingLength    int16
	SrcContentLanguageLength    int16
	SrcContentDispositionLength int16
	SrcCacheControlLength       int16
	SrcContentMD5Length         int16
	SrcMetadataLength           int16
	SrcBlobTypeLength           int16
	// contains filtered or unexported fields
}

JobPartPlanTransfer represent the header of Job Part's Transfer in Memory Map File

func (*JobPartPlanTransfer) ErrorCode

func (jppt *JobPartPlanTransfer) ErrorCode() int32

ErrorCode returns the transfer's errorCode.

func (*JobPartPlanTransfer) SetErrorCode

func (jppt *JobPartPlanTransfer) SetErrorCode(errorCode int32, overwrite bool)

SetErrorCode sets the error code of the error if transfer failed. overWrite flags if set to true overWrites the atomicErrorCode. If overWrite flag is set to false, then errorCode won't be overwritten.

func (*JobPartPlanTransfer) SetTransferStatus

func (jppt *JobPartPlanTransfer) SetTransferStatus(status common.TransferStatus, overWrite bool)

SetTransferStatus sets the transfer's status overWrite flags if set to true overWrites the failed status. If overWrite flag is set to false, then status of transfer is set to failed won't be overWritten. overWrite flag is used while resuming the failed transfers where the errorCode are set to default i.e 0

func (*JobPartPlanTransfer) TransferStatus

func (jppt *JobPartPlanTransfer) TransferStatus() common.TransferStatus

TransferStatus returns the transfer's status

type PartNumber

type PartNumber = common.PartNumber

type SuicideJob

type SuicideJob struct{}

type TransferInfo

type TransferInfo struct {
	BlockSize   uint32
	Source      string
	SourceSize  int64
	Destination string

	SrcHTTPHeaders azblob.BlobHTTPHeaders // User for S2S copy, where per transfer's src properties need be set in destination.
	SrcMetadata    common.Metadata

	// Transfer info for blob only
	SrcBlobType azblob.BlobType

	// NumChunks is the number of chunks in which transfer will be split into while uploading the transfer.
	// NumChunks is not used in case of AppendBlob transfer.
	NumChunks uint16
}

type XferChannels

type XferChannels struct {
	// contains filtered or unexported fields
}

type XferRetryOptions

type XferRetryOptions struct {
	// Policy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants.\
	// A value of zero means that you accept our default policy.
	Policy XferRetryPolicy

	// MaxTries specifies the maximum number of attempts an operation will be tried before producing an error (0=default).
	// A value of zero means that you accept our default policy. A value of 1 means 1 try and no retries.
	MaxTries int32

	// TryTimeout indicates the maximum time allowed for any single try of an HTTP request.
	// A value of zero means that you accept our default timeout. NOTE: When transferring large amounts
	// of data, the default TryTimeout will probably not be sufficient. You should override this value
	// based on the bandwidth available to the host machine and proximity to the Storage service. A good
	// starting point may be something like (60 seconds per MB of anticipated-payload-size).
	TryTimeout time.Duration

	// RetryDelay specifies the amount of delay to use before retrying an operation (0=default).
	// The delay increases (exponentially or linearly) with each retry up to a maximum specified by
	// MaxRetryDelay. If you specify 0, then you must also specify 0 for MaxRetryDelay.
	RetryDelay time.Duration

	// MaxRetryDelay specifies the maximum delay allowed before retrying an operation (0=default).
	// If you specify 0, then you must also specify 0 for RetryDelay.
	MaxRetryDelay time.Duration

	// RetryReadsFromSecondaryHost specifies whether the retry policy should retry a read operation against another host.
	// If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host.
	// NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent
	// data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs
	RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs
}

XferRetryOptions configures the retry policy's behavior.

type XferRetryPolicy

type XferRetryPolicy int32

XferRetryPolicy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants. Added a new retry policy and not using the existing policy azblob.zc_retry_policy.go since there are some changes in the retry policy. Retry on all the type of network errors instead of retrying only in case of temporary or timeout errors.

const (
	// RetryPolicyExponential tells the pipeline to use an exponential back-off retry policy
	RetryPolicyExponential XferRetryPolicy = 0

	// RetryPolicyFixed tells the pipeline to use a fixed back-off retry policy
	RetryPolicyFixed XferRetryPolicy = 1
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL