Documentation
¶
Index ¶
- Variables
- type ChunkFileService
- type ChunkFileServiceFactory
- type ChunkManagementService
- type ChunkManager
- func (m *ChunkManager) GetChunkSize() int
- func (m *ChunkManager) GetIncompleteUpload(filepath string) Upload
- func (m *ChunkManager) Init()
- func (m *ChunkManager) PutWithStatus(filepath string, filesize int, r io.Reader) chan UploadStatus
- func (m *ChunkManager) ResumePutWithStatus(filepath string, r io.Reader) (chan UploadStatus, error)
- func (m *ChunkManager) RunUploader()
- func (m *ChunkManager) Shutdown()
- func (m *ChunkManager) UploadExists(filepath string) bool
- type ChunkService
- type ChunkUpload
- func (u *ChunkUpload) GetFilepath() string
- func (u *ChunkUpload) GetUploaded() int
- func (u *ChunkUpload) IsComplete() bool
- func (u *ChunkUpload) IsProcessing() bool
- func (u *ChunkUpload) NumReceivedParts() int
- func (u *ChunkUpload) Run() error
- func (u *ChunkUpload) Send(r io.Reader, chunkSizeLimit int)
- func (u *ChunkUpload) SetIsProcessing(b bool)
- func (u *ChunkUpload) Shutdown()
- func (u *ChunkUpload) UpdateExpiry(tm time.Time)
- func (u *ChunkUpload) UpdateUploadedBytes(newPartBytes int)
- type DownloadParams
- type HeadInfo
- type HeadService
- type InitError
- type ListService
- type MD5Service
- type Object
- type ObjectIterator
- type ParallelService
- type RangeService
- type Service
- type ServiceWithConcurrency
- type ServiceWithMimeType
- type ServiceWithTimeout
- type SignedUrlConfig
- type SignedUrlService
- type SignedUrlUploadService
- type StreamService
- type Upload
- type UploadError
- type UploadStatus
- type UploadStatusCode
Constants ¶
This section is empty.
Variables ¶
var (
ErrUploadNotFound = errors.New("upload not found")
)
Functions ¶
This section is empty.
Types ¶
type ChunkFileService ¶
type ChunkFileService interface {
// IsInitialized should return true if the ChunkFileService has already been initialized.
IsInitialized() bool
// InitializeChunk initializes the chunk upload operation.
InitializeChunk(u *ChunkUpload) error
// UploadPart should upload the provided bytes to the cloud. Uploaded parts should be completely sequential.
UploadPart([]byte) error
// CompleteUpload should perform any necessary steps to finalize the multipart upload.
CompleteUpload() error
// Cleanup should delete the ChunkFileService and do any other cleanup necessary. It is only called when an
// incomplete upload has timed out.
Cleanup()
}
ChunkFileService is the service that actually performs the upload operations to a third party (e.g., local file system, S3, GCS).
type ChunkFileServiceFactory ¶
type ChunkFileServiceFactory func() ChunkFileService
ChunkFileServiceFactory creates a new ChunkFileService for each upload.
type ChunkManagementService ¶
type ChunkManagementService interface {
RunUploader()
Shutdown()
}
ChunkManagementService is an interface that defines an uploader which is managed. This means that the uploader runs a goroutine which receives parts of files through a channel and uploads them sequentially, rather than pushing the updates directly. By doing this, we can be certain that parts are uploaded sequentially.
type ChunkManager ¶
type ChunkManager struct {
// ChunkSizeLimit defines the maximum size a chunk can be.
ChunkSizeLimit int
// IncompleteUploadExpiry defines the expiry duration of a chunk.
IncompleteUploadExpiry time.Duration
// Creates FileServices that is required to run the ChunkManager. Each file service should handle one upload.
// For example, if an upload ID is required to sync between chunks, the implementation of ChunkFileService which
// is returned can store the required upload ID.
FS ChunkFileServiceFactory
// contains filtered or unexported fields
}
ChunkManager manages uploading chunks by providing a goroutine which performs the chunking. It contains a service that should be run on a goroutine.
Usage:
// Initialize
manager := &ChunkManager{...}
go manager.Run()
defer manager.Shutdown()
// Send a first chunk
notifier := manager.PutWithStatus("some filepath", bytes.NewReader(b))
for n := range notifier {
handleNotifications(n)
}
// To send a second chunk
notifier := manager.ResumePutWithStatus("some filepath", bytes.NewReader(b))
for n := range notifier {
handleNotifications(n)
}
The notifier is used to provide updates on the progress of the upload. The notifier returned from StartUpload is the same as the one from ResumeUpload. Ensure to close the notifier once done.
The notifier will send a notification of type UploadStatus which contains a code, and either a message or an error. In the case that Code is "Error", it will populate the [Error] field. Otherwise, it will populate the `Message field.
When complete, the `UploadStatus` will return a message saying `Upload Complete`.
Implementation Notes: ===================== The ChunkManager only manages the *reception* of data for a ChunkUpload. The actual logic for sending data in chunks is handled by the ChunkUpload. Consequently, the ChunkManager only registers the beginning and ending of a chunk as well as free data when an incomplete chunk is too old.
func (*ChunkManager) GetChunkSize ¶
func (m *ChunkManager) GetChunkSize() int
GetChunkSize returns the chunk size. Required for ChunkService.
func (*ChunkManager) GetIncompleteUpload ¶
func (m *ChunkManager) GetIncompleteUpload(filepath string) Upload
GetIncompleteUpload returns the incomplete upload. Required for ChunkService.
func (*ChunkManager) Init ¶
func (m *ChunkManager) Init()
func (*ChunkManager) PutWithStatus ¶
func (m *ChunkManager) PutWithStatus(filepath string, filesize int, r io.Reader) chan UploadStatus
PutWithStatus tells the ChunkManager to start the upload. It adds the upload to the upload queue and proceeds to send data from the provided io.Reader.
func (*ChunkManager) ResumePutWithStatus ¶
func (m *ChunkManager) ResumePutWithStatus(filepath string, r io.Reader) (chan UploadStatus, error)
ResumePutWithStatus tells the ChunkManager to resume the upload by sending more data. It assumes the data in StartUpload was not enough to complete the upload.
func (*ChunkManager) RunUploader ¶
func (m *ChunkManager) RunUploader()
RunUploader starts the goroutine that actually performs the upload for a chunk. Note that this is an infinite loop and so should be run in a goroutine.
e.g.,
go manager.RunUploader()
func (*ChunkManager) Shutdown ¶
func (m *ChunkManager) Shutdown()
Shutdown sends a signal to exit the goroutine.
func (*ChunkManager) UploadExists ¶
func (m *ChunkManager) UploadExists(filepath string) bool
UploadExists returns true if the upload exists.
type ChunkService ¶
type ChunkService interface {
PutWithStatus(filepath string, filesize int, r io.Reader) chan UploadStatus
ResumePutWithStatus(filepath string, r io.Reader) (chan UploadStatus, error)
GetIncompleteUpload(filepath string) Upload
GetChunkSize() int
}
ChunkService provides the methods required to upload a file in chunks.
type ChunkUpload ¶
type ChunkUpload struct {
// Filepath for the upload.
Filepath string
// FileService for running chunks.
FS ChunkFileService
// The expiry time for the upload. If the upload has expired, then we should not perform any further processing
// on it.
Expiry time.Time
// Number of chunks for this upload.
Chunks int
// contains filtered or unexported fields
}
ChunkUpload is a specific upload related to the chunk.
func (*ChunkUpload) GetFilepath ¶
func (u *ChunkUpload) GetFilepath() string
GetFilepath returns the filepath. Required to implement Upload interface.
func (*ChunkUpload) GetUploaded ¶
func (u *ChunkUpload) GetUploaded() int
GetUploaded returns the number of bytes already uplaoded. Required to implement Upload interface.
func (*ChunkUpload) IsComplete ¶
func (u *ChunkUpload) IsComplete() bool
IsComplete returns true when the # of parts received is the same as the expected number of chunks.
func (*ChunkUpload) IsProcessing ¶
func (u *ChunkUpload) IsProcessing() bool
IsProcessing returns true if the required number of chunks hasn't been reached.
func (*ChunkUpload) NumReceivedParts ¶
func (u *ChunkUpload) NumReceivedParts() int
NumReceivedParts returns the number of parts already received for the chunk upload.
func (*ChunkUpload) Run ¶
func (u *ChunkUpload) Run() error
Run performs the logic for this specific upload, ensuring that all upload items have been provided.
func (*ChunkUpload) Send ¶
func (u *ChunkUpload) Send(r io.Reader, chunkSizeLimit int)
Send sends the data to the goroutine that performs the actual upload in chunks conforming to the chunkSizeLimit.
func (*ChunkUpload) SetIsProcessing ¶
func (u *ChunkUpload) SetIsProcessing(b bool)
func (*ChunkUpload) Shutdown ¶
func (u *ChunkUpload) Shutdown()
func (*ChunkUpload) UpdateExpiry ¶
func (u *ChunkUpload) UpdateExpiry(tm time.Time)
UpdateExpiry allows updating of the expiry date, for example, when the upload is continued.
func (*ChunkUpload) UpdateUploadedBytes ¶
func (u *ChunkUpload) UpdateUploadedBytes(newPartBytes int)
type DownloadParams ¶
type DownloadParams struct {
// A custom concurrency.
Concurrency int
}
DownloadParams are params specific to ParallelService.Download. The implementation should be able to handle null properly. These should just be custom values and input as needed.
type HeadService ¶
HeadService provides a way for a data service to return statistical information such as the content length and last modified date from the data service.
type ListService ¶
type ListService interface {
// Objects should return an iterator for all objects in a bucket.
Objects() (ObjectIterator, func())
}
ListService allows for objects to be listed.
type MD5Service ¶
MD5Service provides a way for a data service to return MD5 hash from data object attributes.
type Object ¶
type Object struct {
// Filepath for the object.
Filepath string
// ContentType is the MIME type of the object's content.
ContentType string
// Size is the length of the object's content.
Size int64
}
Object is metadata relating to an object in the data service.
type ObjectIterator ¶
type ObjectIterator interface {
// Next returns the next result. Its second return value is iterator.Done if
// there are no more results. Once Next returns iterator.Done, all subsequent
// calls will return iterator.Done.
Next() (*Object, error)
}
ObjectIterator iterates through objects in a list.
type ParallelService ¶
type ParallelService interface {
Download(filepath string, w io.WriterAt, p *DownloadParams) error
Upload(filepath string, filesize int, r io.Reader) error
}
ParallelService allows upload and download of files in parallel. This is not to be confused with uploading in chunks. Uploading in chunks assumes that the provided reader only has a part of the information, whereas for parallel Upload, the reader contains all the data to be uploaded. Only the act of uploading is done in paralle.
A parallel service should also satisfy the `ServiceWithConcurrency` interface.
type RangeService ¶
type RangeService interface {
DownloadRange(filepath string, w io.WriterAt, start, finish int) error
}
RangeService allows for a part of the file to be downloaded. Dictate the [start, finish) of the download, and the result will be written into io.WriterAt.
For example, if start=0 and finish=5, the function should return bytes 0-4.
type Service ¶
type Service interface {
Exists(filepath string) (bool, error)
Get(filepath string) (io.ReadCloser, error)
Put(filepath string, r io.Reader) error
Delete(filepath string) error
}
Service defines a basic data service. It allows the user to Get, Put, Delete, and check that a certain piece of data exists. Categorization is by filepath (not necessarily / delimited). To use this service, encapsulate it in another interface.
For example:
type FileService interface {
data.Service
data.HeadService
// ....
}
type ServiceWithConcurrency ¶
type ServiceWithConcurrency interface {
SetConcurrency(concurrency int)
}
ServiceWithConcurrency allows a concurrency to be set for the service. This should be used for services which implement ParallelService. Typically, this should *not* be included in the service description, but optionally set if the service allows it.
type ServiceWithMimeType ¶
ServiceWithMimeType allows to set the MimeType/ContentType for a specific file that has been uploaded. This service is optional.
type ServiceWithTimeout ¶
ServiceWithTimeout allows a timeout to be set for a service. Typically, this should *not* be included in the service description, but optionally set if the service allows it.
type SignedUrlConfig ¶
func (SignedUrlConfig) GetContentType ¶
func (c SignedUrlConfig) GetContentType() string
func (SignedUrlConfig) GetDisposition ¶
func (c SignedUrlConfig) GetDisposition() string
type SignedUrlService ¶
type SignedUrlService interface {
SignedUrl(filepath string, tm time.Duration, cfg *SignedUrlConfig) (string, error)
}
SignedUrlService allows a service to return a signed URL for a filepath.
type SignedUrlUploadService ¶
type SignedUrlUploadService interface {
PutSignedUrl(filepath string, tm time.Duration, cfg *SignedUrlConfig) (string, error)
}
SignedUrlUploadService allows a service to return a signed URL for uploading an object to a filepath.
type StreamService ¶
StreamService allows for data to be streamed. Pass in the writer which should be streamed to.
type UploadError ¶
type UploadError struct {
// contains filtered or unexported fields
}
func (UploadError) Error ¶
func (e UploadError) Error() string
type UploadStatus ¶
type UploadStatus struct {
Code UploadStatusCode
Message string
Error error
}
func ErrUploadStatus ¶
func ErrUploadStatus(err error) UploadStatus
func OkUploadStatus ¶
func OkUploadStatus() UploadStatus
type UploadStatusCode ¶
type UploadStatusCode string
var ( UploadStatusCodeOk UploadStatusCode = "Ok" UploadStatusCodeProgress UploadStatusCode = "Progress" UploadStatusCodeError UploadStatusCode = "Error" )