data

package
v0.0.0-...-74e38f5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2025 License: MIT Imports: 7 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUploadNotFound = errors.New("upload not found")
)

Functions

This section is empty.

Types

type ChunkFileService

type ChunkFileService interface {
	// IsInitialized should return true if the ChunkFileService has already been initialized.
	IsInitialized() bool

	// InitializeChunk initializes the chunk upload operation.
	InitializeChunk(u *ChunkUpload) error

	// UploadPart should upload the provided bytes to the cloud. Uploaded parts should be completely sequential.
	UploadPart([]byte) error

	// CompleteUpload should perform any necessary steps to finalize the multipart upload.
	CompleteUpload() error

	// Cleanup should delete the ChunkFileService and do any other cleanup necessary. It is only called when an
	// incomplete upload has timed out.
	Cleanup()
}

ChunkFileService is the service that actually performs the upload operations to a third party (e.g., local file system, S3, GCS).

type ChunkFileServiceFactory

type ChunkFileServiceFactory func() ChunkFileService

ChunkFileServiceFactory creates a new ChunkFileService for each upload.

type ChunkManagementService

type ChunkManagementService interface {
	RunUploader()
	Shutdown()
}

ChunkManagementService is an interface that defines an uploader which is managed. This means that the uploader runs a goroutine which receives parts of files through a channel and uploads them sequentially, rather than pushing the updates directly. By doing this, we can be certain that parts are uploaded sequentially.

type ChunkManager

type ChunkManager struct {
	// ChunkSizeLimit defines the maximum size a chunk can be.
	ChunkSizeLimit int

	// IncompleteUploadExpiry defines the expiry duration of a chunk.
	IncompleteUploadExpiry time.Duration

	// Creates FileServices that is required to run the ChunkManager. Each file service should handle one upload.
	// For example, if an upload ID is required to sync between chunks, the implementation of ChunkFileService which
	// is returned can store the required upload ID.
	FS ChunkFileServiceFactory
	// contains filtered or unexported fields
}

ChunkManager manages uploading chunks by providing a goroutine which performs the chunking. It contains a service that should be run on a goroutine.

Usage:

// Initialize
manager := &ChunkManager{...}
go manager.Run()
defer manager.Shutdown()

// Send a first chunk
notifier := manager.PutWithStatus("some filepath", bytes.NewReader(b))
for n := range notifier {
	handleNotifications(n)
}

// To send a second chunk
notifier := manager.ResumePutWithStatus("some filepath", bytes.NewReader(b))
for n := range notifier {
	handleNotifications(n)
}

The notifier is used to provide updates on the progress of the upload. The notifier returned from StartUpload is the same as the one from ResumeUpload. Ensure to close the notifier once done.

The notifier will send a notification of type UploadStatus which contains a code, and either a message or an error. In the case that Code is "Error", it will populate the [Error] field. Otherwise, it will populate the `Message field.

When complete, the `UploadStatus` will return a message saying `Upload Complete`.

Implementation Notes: ===================== The ChunkManager only manages the *reception* of data for a ChunkUpload. The actual logic for sending data in chunks is handled by the ChunkUpload. Consequently, the ChunkManager only registers the beginning and ending of a chunk as well as free data when an incomplete chunk is too old.

func (*ChunkManager) GetChunkSize

func (m *ChunkManager) GetChunkSize() int

GetChunkSize returns the chunk size. Required for ChunkService.

func (*ChunkManager) GetIncompleteUpload

func (m *ChunkManager) GetIncompleteUpload(filepath string) Upload

GetIncompleteUpload returns the incomplete upload. Required for ChunkService.

func (*ChunkManager) Init

func (m *ChunkManager) Init()

func (*ChunkManager) PutWithStatus

func (m *ChunkManager) PutWithStatus(filepath string, filesize int, r io.Reader) chan UploadStatus

PutWithStatus tells the ChunkManager to start the upload. It adds the upload to the upload queue and proceeds to send data from the provided io.Reader.

func (*ChunkManager) ResumePutWithStatus

func (m *ChunkManager) ResumePutWithStatus(filepath string, r io.Reader) (chan UploadStatus, error)

ResumePutWithStatus tells the ChunkManager to resume the upload by sending more data. It assumes the data in StartUpload was not enough to complete the upload.

func (*ChunkManager) RunUploader

func (m *ChunkManager) RunUploader()

RunUploader starts the goroutine that actually performs the upload for a chunk. Note that this is an infinite loop and so should be run in a goroutine.

e.g.,

go manager.RunUploader()

func (*ChunkManager) Shutdown

func (m *ChunkManager) Shutdown()

Shutdown sends a signal to exit the goroutine.

func (*ChunkManager) UploadExists

func (m *ChunkManager) UploadExists(filepath string) bool

UploadExists returns true if the upload exists.

type ChunkService

type ChunkService interface {
	PutWithStatus(filepath string, filesize int, r io.Reader) chan UploadStatus
	ResumePutWithStatus(filepath string, r io.Reader) (chan UploadStatus, error)
	GetIncompleteUpload(filepath string) Upload
	GetChunkSize() int
}

ChunkService provides the methods required to upload a file in chunks.

type ChunkUpload

type ChunkUpload struct {
	// Filepath for the upload.
	Filepath string

	// FileService for running chunks.
	FS ChunkFileService

	// The expiry time for the upload. If the upload has expired, then we should not perform any further processing
	// on it.
	Expiry time.Time

	// Number of chunks for this upload.
	Chunks int
	// contains filtered or unexported fields
}

ChunkUpload is a specific upload related to the chunk.

func (*ChunkUpload) GetFilepath

func (u *ChunkUpload) GetFilepath() string

GetFilepath returns the filepath. Required to implement Upload interface.

func (*ChunkUpload) GetUploaded

func (u *ChunkUpload) GetUploaded() int

GetUploaded returns the number of bytes already uplaoded. Required to implement Upload interface.

func (*ChunkUpload) IsComplete

func (u *ChunkUpload) IsComplete() bool

IsComplete returns true when the # of parts received is the same as the expected number of chunks.

func (*ChunkUpload) IsProcessing

func (u *ChunkUpload) IsProcessing() bool

IsProcessing returns true if the required number of chunks hasn't been reached.

func (*ChunkUpload) NumReceivedParts

func (u *ChunkUpload) NumReceivedParts() int

NumReceivedParts returns the number of parts already received for the chunk upload.

func (*ChunkUpload) Run

func (u *ChunkUpload) Run() error

Run performs the logic for this specific upload, ensuring that all upload items have been provided.

func (*ChunkUpload) Send

func (u *ChunkUpload) Send(r io.Reader, chunkSizeLimit int)

Send sends the data to the goroutine that performs the actual upload in chunks conforming to the chunkSizeLimit.

func (*ChunkUpload) SetIsProcessing

func (u *ChunkUpload) SetIsProcessing(b bool)

func (*ChunkUpload) Shutdown

func (u *ChunkUpload) Shutdown()

func (*ChunkUpload) UpdateExpiry

func (u *ChunkUpload) UpdateExpiry(tm time.Time)

UpdateExpiry allows updating of the expiry date, for example, when the upload is continued.

func (*ChunkUpload) UpdateUploadedBytes

func (u *ChunkUpload) UpdateUploadedBytes(newPartBytes int)

type DownloadParams

type DownloadParams struct {
	// A custom concurrency.
	Concurrency int
}

DownloadParams are params specific to ParallelService.Download. The implementation should be able to handle null properly. These should just be custom values and input as needed.

type HeadInfo

type HeadInfo struct {
	Exists        bool
	LastModified  time.Time
	ContentLength int64
}

HeadInfo is the information returned by the HeadService.

type HeadService

type HeadService interface {
	Head(filepath string) (*HeadInfo, error)
}

HeadService provides a way for a data service to return statistical information such as the content length and last modified date from the data service.

type InitError

type InitError struct {
	// contains filtered or unexported fields
}

func (InitError) Error

func (e InitError) Error() string

type ListService

type ListService interface {
	// Objects should return an iterator for all objects in a bucket.
	Objects() (ObjectIterator, func())
}

ListService allows for objects to be listed.

type MD5Service

type MD5Service interface {
	MD5(filepath string) ([]byte, error)
}

MD5Service provides a way for a data service to return MD5 hash from data object attributes.

type Object

type Object struct {
	// Filepath for the object.
	Filepath string

	// ContentType is the MIME type of the object's content.
	ContentType string

	// Size is the length of the object's content.
	Size int64
}

Object is metadata relating to an object in the data service.

type ObjectIterator

type ObjectIterator interface {
	// Next returns the next result. Its second return value is iterator.Done if
	// there are no more results. Once Next returns iterator.Done, all subsequent
	// calls will return iterator.Done.
	Next() (*Object, error)
}

ObjectIterator iterates through objects in a list.

type ParallelService

type ParallelService interface {
	Download(filepath string, w io.WriterAt, p *DownloadParams) error
	Upload(filepath string, filesize int, r io.Reader) error
}

ParallelService allows upload and download of files in parallel. This is not to be confused with uploading in chunks. Uploading in chunks assumes that the provided reader only has a part of the information, whereas for parallel Upload, the reader contains all the data to be uploaded. Only the act of uploading is done in paralle.

A parallel service should also satisfy the `ServiceWithConcurrency` interface.

type RangeService

type RangeService interface {
	DownloadRange(filepath string, w io.WriterAt, start, finish int) error
}

RangeService allows for a part of the file to be downloaded. Dictate the [start, finish) of the download, and the result will be written into io.WriterAt.

For example, if start=0 and finish=5, the function should return bytes 0-4.

type Service

type Service interface {
	Exists(filepath string) (bool, error)
	Get(filepath string) (io.ReadCloser, error)
	Put(filepath string, r io.Reader) error

	Delete(filepath string) error
}

Service defines a basic data service. It allows the user to Get, Put, Delete, and check that a certain piece of data exists. Categorization is by filepath (not necessarily / delimited). To use this service, encapsulate it in another interface.

For example:

type FileService interface {
    data.Service
    data.HeadService
    // ....
}

type ServiceWithConcurrency

type ServiceWithConcurrency interface {
	SetConcurrency(concurrency int)
}

ServiceWithConcurrency allows a concurrency to be set for the service. This should be used for services which implement ParallelService. Typically, this should *not* be included in the service description, but optionally set if the service allows it.

type ServiceWithMimeType

type ServiceWithMimeType interface {
	SetMimeType(filepath, mimeType string) error
}

ServiceWithMimeType allows to set the MimeType/ContentType for a specific file that has been uploaded. This service is optional.

type ServiceWithTimeout

type ServiceWithTimeout interface {
	SetTimeout(tm time.Duration)
}

ServiceWithTimeout allows a timeout to be set for a service. Typically, this should *not* be included in the service description, but optionally set if the service allows it.

type SignedUrlConfig

type SignedUrlConfig struct {
	Download    bool
	Filename    string
	ContentType string
}

func (SignedUrlConfig) GetContentType

func (c SignedUrlConfig) GetContentType() string

func (SignedUrlConfig) GetDisposition

func (c SignedUrlConfig) GetDisposition() string

type SignedUrlService

type SignedUrlService interface {
	SignedUrl(filepath string, tm time.Duration, cfg *SignedUrlConfig) (string, error)
}

SignedUrlService allows a service to return a signed URL for a filepath.

type SignedUrlUploadService

type SignedUrlUploadService interface {
	PutSignedUrl(filepath string, tm time.Duration, cfg *SignedUrlConfig) (string, error)
}

SignedUrlUploadService allows a service to return a signed URL for uploading an object to a filepath.

type StreamService

type StreamService interface {
	Stream(filepath string, w io.Writer) error
}

StreamService allows for data to be streamed. Pass in the writer which should be streamed to.

type Upload

type Upload interface {
	GetFilepath() string
	GetUploaded() int
}

Upload refers to an instance of upload. It allows for chunked and resumable uploads.

type UploadError

type UploadError struct {
	// contains filtered or unexported fields
}

func (UploadError) Error

func (e UploadError) Error() string

type UploadStatus

type UploadStatus struct {
	Code    UploadStatusCode
	Message string
	Error   error
}

func ErrUploadStatus

func ErrUploadStatus(err error) UploadStatus

func OkUploadStatus

func OkUploadStatus() UploadStatus

type UploadStatusCode

type UploadStatusCode string
var (
	UploadStatusCodeOk       UploadStatusCode = "Ok"
	UploadStatusCodeProgress UploadStatusCode = "Progress"
	UploadStatusCodeError    UploadStatusCode = "Error"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL