largefile

package
v0.0.0-...-69242ac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2025 License: Apache-2.0 Imports: 25 Imported by: 2

README

Package cloudeng.io/file/largefile

import cloudeng.io/file/largefile

Variables

ErrCacheInvalidBlockSize, ErrCacheInvalidOffset, ErrCacheUncachedRange, ErrInternalError
ErrCacheInvalidBlockSize = errors.New("invalid block size")
ErrCacheInvalidOffset = errors.New("invalid offset")
ErrCacheUncachedRange = errors.New("uncached range")
ErrInternalError = &internalError{}

ErrNotEnoughSpace
ErrNotEnoughSpace = errors.New("not enough space available for the requested operation")

Functions

Func CacheFilesExist
func CacheFilesExist(data, index string) (bool, bool, error)

CacheFilesExist checks if the cache and index files exist.

Func CreateNewFilesForCache
func CreateNewFilesForCache(ctx context.Context, filename, indexFileName string, contentSize int64, blockSize, concurrency int, progressCh chan<- int64) error

CreateNewFilesForCache creates a new cache file and an index file for caching byte ranges of large files. It will remove any existing files with the same names before creating new ones. It reserves space for the cache file and initializes the index file with the specified content size and block size. It returns an error if the files cannot be created or if the space cannot be reserved. The index file is used to track which byte ranges have been written to the cache. The cache file is used to store the actual data. The contentSize is the total size of the file in bytes, blockSize is the preferred block size for downloading the file, and concurrency is the number of concurrent writes used to reserve space for the cache file on systems that require writing to the file to reserve space (e.g., non-Linux systems).

Func NewBackoff
func NewBackoff(initial time.Duration, steps int) ratecontrol.Backoff

NewBackoff creates a new backoff instance that implements an exponential backoff algorithm unless the RetryResponse specifies a specific backoff duration. The backoff will continue for the specified number of steps, after which it will return true to indicate that no more retries should be attempted.

Func NumBlocks
func NumBlocks(contentSize int64, blockSize int) int

NumBlocks calculates the number of blocks required to cover the content size given the specified block size. It returns the number of blocks needed. If the content size is not a multiple of the block size, it adds an additional block to cover the remaining bytes.

Func OpenCacheFiles
func OpenCacheFiles(data, index string) (CacheFileReadWriter, CacheFileReadWriter, error)

OpenCacheFiles opens the cache and index files for reading and writing.

Func Ranges
func Ranges(from, to int64, blockSize int) iter.Seq[ByteRange]

Ranges returns an iterator for all byte ranges over the specified range and block size. Each range is inclusive of the 'From' byte and the 'To' byte. The ranges are generated in blocks of the specified size, with the last block potentially being smaller than the specified block size.

Func ReserveSpace
func ReserveSpace(ctx context.Context, filename string, size int64, blockSize, concurrency int, progressCh chan<- int64) error

ReserveSpace creates a file with the specified filename and allocates the specified size bytes to it. It verifies that the file was created with the requested storage allocated. On systems that support space reservation, such as Linux, space is reserved accordingly, on others data is written to the file to ensure that the space is allocated. The intent is to ensure that a download operations never fails because of insufficient local space once it has been initiated. Progress can be reported via the progressCh channel, which will receive updates on the amount of space reserved. The channel will be closed when ReserveSpace returns. If concurrency is 0 or less, it defaults to the number of CPU cores available on the system.

Types

Type ByteRange
type ByteRange struct {
	From int64 // Inclusive start of the range.
	To   int64 // Inclusive end of the range.
}

ByteRange represents a range of bytes in a file. The range is inclusive of the 'From' byte and the 'To' byte as per the HTTP Range header specification/convention.

Functions
func RangeForIndex(index int, contentSize int64, blockSize int) ByteRange

RangeForIndex returns the byte range for the specified block index in a series of blocks of the specified size over the content size. The range is inclusive of the 'From' byte and the 'To' byte. If the index is out of bounds, it returns an invalid range with From and To set to -1.

Methods
func (br ByteRange) Size() int64
func (br ByteRange) String() string
Type ByteRanges
type ByteRanges struct {
	// contains filtered or unexported fields
}

ByteRanges represents a collection of equally sized (apart from the last range), contiguous, byte ranges that can be used to track which parts of a file have or have not been 'processed', e.g downloaded, cached, uploaded etc. The ranges are represented as a bitmap, where each bit corresponds to a block of bytes of the specified size. The bitmap is used to efficiently track which byte ranges are set (processed) and which are clear (not processed). ByteRanges also allows for the contiguous head of the byte ranges to be tracked asynchronously. ByteRanges is thread-safe and can be used concurrently by multiple goroutines.

Functions
func NewByteRanges(contentSize int64, blockSize int) *ByteRanges

NewByteRanges creates a new ByteRanges instance with the specified content size and block size. The content size is the total size of the file in bytes, and the block size is the size of each byte range in bytes.

Methods
func (br *ByteRanges) AllClear(start int) iter.Seq[ByteRange]

AllClear returns an iterator for all clear byte ranges starting from 'start'. A read lock is held while iterating over the byte ranges, hence calling any other method, such as Set, which takes a write lock will block until the iteration is complete. Use NextClear if finer-grained control is needed.

func (br *ByteRanges) AllSet(start int) iter.Seq[ByteRange]

AllSet returns an iterator for all set byte ranges starting from 'start'. A read lock is held while iterating over the byte ranges, hence calling any other method, such as Set, which takes a write lock will block until the iteration is complete. Use NextSet if finer-grained control is needed.

func (br ByteRanges) Block(pos int64) int

Block returns the block index for the specified position. It returns -1 if the position is out of bounds.

func (br ByteRanges) BlockSize() int

BlockSize returns the size of each block in bytes.

func (br ByteRanges) ContentLength() int64

ContentLength returns the total size of the content in bytes.

func (br *ByteRanges) IsClear(pos int64) bool

IsClear checks if the byte range for the specified position is clear.

func (br *ByteRanges) IsSet(pos int64) bool

IsSet checks if the byte range for the specified position is set.

func (br *ByteRanges) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface for ByteRanges.

func (br *ByteRanges) NextClear(start int, nbr *ByteRange) int

NextClear returns the next clear byte range starting from 'start'. It starts searching from the specified start index and returns the index of the next outstanding range which can be used to continue searching for the next outstanding range. The index will be -1 if there are no more outstanding ranges.

for start := NextClear(0, &br); start >= 0; start = NextClear(start, &br) {
    // Do something with the byte range br.
}
func (br *ByteRanges) NextSet(start int, nbr *ByteRange) int

NextSet returns the next set byte range starting from 'start' and behaves similarly to NextClear.

func (br *ByteRanges) Notify() <-chan struct{}

Notify returns a channel that is closed when the contiguous byte ranges starting at the first byte (ie. 0) are extended.

func (br ByteRanges) NumBlocks() int

NumBlocks returns the number of blocks required to cover the byte ranges represented by this ByteRanges instance.

func (br *ByteRanges) Set(pos int64)

Set marks the byte range for the specified position as set. It has no effect if the position is out of bounds.

func (br *ByteRanges) Tail() (ByteRange, bool)

Tail returns the contiguous byte ranage that starts at the first byte and extends to the last contiguous byte from the start that has been set. It returns false if no ranges have been set.

func (br *ByteRanges) UnmarshalJSON(data []byte) error

UnmarshalJSON implements the json.Unmarshaler interface for ByteRanges.

Type ByteRangesTracker
type ByteRangesTracker struct {
	// contains filtered or unexported fields
}

ByteRangesTracker tracks byte ranges but is not thread safe and does not support tracking the contiguous head of the byte ranges.

Functions
func NewByteRangesTracker(contentSize int64, blockSize int) *ByteRangesTracker

NewByteRangesTracker creates a new ByteRangesTracker instance with the specified content size and block size.

Methods
func (br ByteRangesTracker) Block(pos int64) int

Block returns the block index for the specified position. It returns -1 if the position is out of bounds.

func (br ByteRangesTracker) BlockSize() int

BlockSize returns the size of each block in bytes.

func (br *ByteRangesTracker) Clear(pos int64)

Clear clears the byte range for the specified position.

func (br ByteRangesTracker) ContentLength() int64

ContentLength returns the total size of the content in bytes.

func (br ByteRangesTracker) IsClear(pos int64) bool

IsClear checks if the byte range for the specified position is clear.

func (br ByteRangesTracker) IsSet(pos int64) bool

IsSet checks if the byte range for the specified position is set.

func (br *ByteRangesTracker) MarshalJSON() ([]byte, error)
func (br ByteRangesTracker) NextClear(start int, nbr *ByteRange) int

NextClear returns the next clear byte range starting from 'start'. It starts searching from the specified start index and returns the index of the next outstanding range which can be used to continue searching for the next outstanding range. The index will be -1 if there are no more outstanding ranges.

for start := NextClear(0, &br); start >= 0; start = NextClear(start, &br) {
    // Do something with the byte range br.
}
func (br ByteRangesTracker) NextSet(start int, nbr *ByteRange) int

NextSet returns the next set byte range starting from 'start' and behaves similarly to NextClear.

func (br ByteRangesTracker) NumBlocks() int

NumBlocks returns the number of blocks required to cover the byte ranges represented by this ByteRanges instance.

func (br *ByteRangesTracker) Set(pos int64)

Set marks the byte range for the specified position as set. It has no effect if the position is out of bounds.

func (br *ByteRangesTracker) UnmarshalJSON(data []byte) error
Type CacheFileReadWriter
type CacheFileReadWriter interface {
	io.Reader
	io.Writer
	io.ReaderAt
	io.WriterAt
	io.Closer
	Name() string // Name returns the name of the file.
	Sync() error  // Sync ensures that all writes to the cache file are flushed to disk.
}

CacheFileReadWriter is an interface that combines the functionality of io.Reader, io.ReaderAt, io.WriterAt, and io.Closer for reading and writing to the cache and index files. It also includes a Sync method to ensure that all writes are flushed to disk.

Type CachingDownloader
type CachingDownloader struct {
	// contains filtered or unexported fields
}

CachingDownloader is a downloader that caches streamed downloaded data to a local cache and supports resuming downloads from where they left off.

Functions
func NewCachingDownloader(file Reader, cache DownloadCache, opts ...DownloadOption) (*CachingDownloader, error)

NewCachingDownloader creates a new CachingDownloader instance.

Methods
func (dl *CachingDownloader) Run(ctx context.Context) (DownloadStatus, error)

Run executes the downloaded process. If the downloader encounters any errors it will return an

Type DownloadCache
type DownloadCache interface {
	// ContentLengthAndBlockSize returns the total length of the file in bytes
	// and the block size used for downloading the file.
	ContentLengthAndBlockSize() (int64, int)

	// CachedBytesAndBlocks returns the total number of bytes and blocks already stored in
	// the cache.
	CachedBytesAndBlocks() (bytes, blocks int64)

	// NextOutstanding finds the next byte range that has not been cached
	// starting from the specified 'start' index. Its return value is either
	// -1 if there are no more outstanding ranges, or the value of the next
	// starting index to continue searching at.
	// To iterate over all outstanding ranges, call this method repeatedly
	// until it returns -1 as follows:
	//    for start := NextOutstanding(0, &br); start != -1; start = NextOutstanding(start, &br) {
	//        // Do something with the byte range br.
	//    }
	NextOutstanding(start int, br *ByteRange) int

	// NextCached finds the next byte range that has been cached in the same manner
	// as NextOutstanding.
	NextCached(start int, br *ByteRange) int

	// Tail returns the contiguous range of bytes that have been cached so far.
	// If this has not grown since the last call to Tail, Tail will block until
	// the tail is extended. If the context is done before the tail is extended,
	// it returns a ByteRange with From and To set to -1.
	Tail(context.Context) ByteRange

	// Complete returns true if all byte ranges have been cached.
	Complete() bool

	// WriteAt writes at most blocksize bytes starting at the specified offset.
	// Offset must be aligned with the block boundaries. It returns an error if
	// data is not exactly blocksize bytes long, unless the offset is at the
	// end of the file, in which case it must be (content length % blocksize)
	// bytes long.
	WriteAt(data []byte, off int64) (int, error)

	// ReadAt reads at most len(data) bytes starting at the specified offset.
	// It returns an error if any of the data to be read is not already cached.
	// The offset need not be aligned with the block boundaries.
	ReadAt(data []byte, off int64) (int, error)
}

DownloadCache is an interface for caching byte ranges of large files to support resumable downloads.

Type DownloadOption
type DownloadOption func(*downloadOptions)
Functions
func WithDownloadConcurrency(n int) DownloadOption

WithDownloadConcurrency sets the number of concurrent download goroutines.

func WithDownloadDigest(h digests.Hash) DownloadOption

WithDownloadDigest sets the hash function to be used for computing the digest of the downloaded data as it is streamed.

func WithDownloadLogger(logger *slog.Logger) DownloadOption

WithDownloadLogger sets the logger for the download.

func WithDownloadProgress(progress chan<- DownloadStats) DownloadOption

WithDownloadProgress sets the channel to report download progress.

func WithDownloadProgressTimeout(timeout time.Duration) DownloadOption

WithDownloadProgressDelay sets a timeout for certain progress updates, such as those sent when a download is completed to allow for the caller to process any pending updates. Routine updates, such as those sent whenever a new byte range is downloaded are sent on a best-effort basis.

func WithDownloadRateController(rc ratecontrol.Limiter) DownloadOption

WithDownloadRateController sets the rate controller for the download.

func WithDownloadWaitForCompletion(wait bool) DownloadOption

WithDownloadWaitForCompletion sets whether the download should iterate, until the download is successfully completed, or return after one iteration. An iteration represents a single pass through the download process whereby every outstsanding byte range is attempted to be downloaded once with retries. A download will either complete after any specified retries or be left outstanding for the next iteration.

Type DownloadStats
type DownloadStats struct {
	CachedOrStreamedBytes  int64 // Total bytes cached or streamed.
	CachedOrStreamedBlocks int64 // Total blocks cached or streamed.
	CacheErrors            int64 // Total number of errors encountered while caching.
	DownloadedBytes        int64 // Total bytes downloaded so far.
	DownloadedBlocks       int64 // Total blocks downloaded so far.
	DownloadSize           int64 // Total size of the file in bytes.
	DownloadBlocks         int64 // Total number of blocks to download.
	DownloadRetries        int64 // Total number of retries made during the download.
	DownloadErrors         int64 // Total number of errors encountered during the download.
	Iterations             int64 // Number of iterations requiredd to complete the download.
}

DownloadStats statistics on the download process and is used for progress reporting.

Type DownloadStatus
type DownloadStatus struct {
	DownloadStats
	Resumable bool          // Indicates if the download can be re-run.
	Complete  bool          // Indicates if the download completed successfully.
	Duration  time.Duration // Total duration of the download.
}

DownloadStatus holds the status for a completed download operation, including the progress made, whether the download is resumable, completed and the total duration of operation.

Type LocalDownloadCache
type LocalDownloadCache struct {
	// contains filtered or unexported fields
}

LocalDownloadCache is a concrete implementation of RangeCache that uses a local file to cache byte ranges of large files. It allows for concurrent access.

Functions
func NewLocalDownloadCache(dataReadWriter, indexReadWriter CacheFileReadWriter) (*LocalDownloadCache, error)

NewLocalDownloadCache creates a new LocalDownloadCache instance. It opens the cache file and loads the index file containing the byte ranges that have been written to the cache. It returns an error if the files cannot be opened or if the index file cannot be loaded. The cache file is used to store the actual data, and the index file is used to track which byte ranges have been written to the cache. The cache and index files must already exist and are expected to be have been created using NewFilesForCache.

Methods
func (c *LocalDownloadCache) CachedBytesAndBlocks() (bytes, blocks int64)
func (c *LocalDownloadCache) Close() error

Close implements DownloadCache.

func (c *LocalDownloadCache) Complete() bool

Complete implements DownloadCache. It returns true if all byte ranges have been cached, meaning there are no more uncached ranges.

func (c *LocalDownloadCache) ContentLengthAndBlockSize() (int64, int)

ContentLengthAndBlockSize implements DownloadCache.

func (c *LocalDownloadCache) NextCached(start int, br *ByteRange) int

NextCached implements DownloadCache. It returns the next, if any, cached byte range starting from the specified index.

func (c *LocalDownloadCache) NextOutstanding(start int, br *ByteRange) int

NextOutstanding implements DownloadCache. It returns the next, if any, uncached byte range starting from the specified index.

func (c *LocalDownloadCache) ReadAt(data []byte, off int64) (int, error)

ReadAt implements DownloadCache.

func (c *LocalDownloadCache) Tail(ctx context.Context) ByteRange

Tail implements DownloadCache. It returns the contiguous range of bytes that have been cached so far. If this has not grown since the last call to Tail, Tail will block until the tail is extended.

func (c *LocalDownloadCache) WriteAt(data []byte, off int64) (int, error)

WriteAt implements DownloadCache.

Type Reader
type Reader interface {
	Name() string // Name returns the name of the file being read.

	// ContentLengthAndBlockSize returns the total length of the file in bytes
	// and the preferred block size used for downloading the file.
	ContentLengthAndBlockSize() (int64, int)

	// Digest returns an instance of digests.Hash that can be used to
	// compute and validate the digest of the file.
	// If the file does not have a digest digest.IsSet() will return false.
	Digest() digests.Hash

	// GetReader retrieves a byte range from the file and returns
	// a reader that can be used to access that data range. In addition to the
	// error, the RetryResponse is returned which indicates whether the
	// operation can be retried and the duration to wait before retrying.
	GetReader(ctx context.Context, from, to int64) (io.ReadCloser, RetryResponse, error)
}

Reader provides support for downloading very large files efficiently concurrently and to allow for resumption of partial downloads.

Type RetryResponse
type RetryResponse interface {
	// IsRetryable checks if the error is retryable.
	IsRetryable() bool

	// BackoffDuration returns true if a specific backoff duration is specified
	// in the response, in which case the duration is returned. If false
	// no specific backoff duration is requested and the backoff algorithm
	// should fallback to something appropriate, such as exponential backoff.
	BackoffDuration() (bool, time.Duration)
}

RetryResponse allows the caller to determine whether an operation that failed with a retryable error can be retried and how long to wait before retrying the operation.

Type StreamingDownloader
type StreamingDownloader struct {
	// contains filtered or unexported fields
}

StreamingDownloader is a downloader that streams data from a large file. The downloader uses concurrent byte range requests to fetch data and then serializes the responses into a single stream for reading.

Functions
func NewStreamingDownloader(file Reader, opts ...DownloadOption) *StreamingDownloader

NewStreamingDownloader creates a new StreamingDownloader instance.

Methods
func (dl *StreamingDownloader) ContentLength() int64

ContentLength returns the content length header for the file being downloaded.

func (dl *StreamingDownloader) Read(buf []byte) (int, error)

Read implements io.Reader.

func (dl *StreamingDownloader) Reader() io.Reader

Reader returns an io.Reader.

func (dl *StreamingDownloader) Run(ctx context.Context) (StreamingStatus, error)
Type StreamingStatus
type StreamingStatus struct {
	DownloadStats
	OutOfOrder    int64         // Total number of out-of-order responses encountered.
	MaxOutOfOrder int64         // Maximum number of out-of-order responses at any point.
	Duration      time.Duration // Total duration of the download.
}

StreamingStatus holds status for a streaming download.

Type Uploader
type Uploader struct{}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCacheInvalidBlockSize = errors.New("invalid block size")
	ErrCacheInvalidOffset    = errors.New("invalid offset")
	ErrCacheUncachedRange    = errors.New("uncached range")
	ErrInternalError         = &internalError{}
)
View Source
var ErrNotEnoughSpace = errors.New("not enough space available for the requested operation")

Functions

func CacheFilesExist

func CacheFilesExist(data, index string) (bool, bool, error)

CacheFilesExist checks if the cache and index files exist.

func CreateNewFilesForCache

func CreateNewFilesForCache(ctx context.Context, filename, indexFileName string, contentSize int64, blockSize, concurrency int, progressCh chan<- int64) error

CreateNewFilesForCache creates a new cache file and an index file for caching byte ranges of large files. It will remove any existing files with the same names before creating new ones. It reserves space for the cache file and initializes the index file with the specified content size and block size. It returns an error if the files cannot be created or if the space cannot be reserved. The index file is used to track which byte ranges have been written to the cache. The cache file is used to store the actual data. The contentSize is the total size of the file in bytes, blockSize is the preferred block size for downloading the file, and concurrency is the number of concurrent writes used to reserve space for the cache file on systems that require writing to the file to reserve space (e.g., non-Linux systems).

func NewBackoff

func NewBackoff(initial time.Duration, steps int) ratecontrol.Backoff

NewBackoff creates a new backoff instance that implements an exponential backoff algorithm unless the RetryResponse specifies a specific backoff duration. The backoff will continue for the specified number of steps, after which it will return true to indicate that no more retries should be attempted.

func NumBlocks

func NumBlocks(contentSize int64, blockSize int) int

NumBlocks calculates the number of blocks required to cover the content size given the specified block size. It returns the number of blocks needed. If the content size is not a multiple of the block size, it adds an additional block to cover the remaining bytes.

func OpenCacheFiles

func OpenCacheFiles(data, index string) (CacheFileReadWriter, CacheFileReadWriter, error)

OpenCacheFiles opens the cache and index files for reading and writing.

func Ranges

func Ranges(from, to int64, blockSize int) iter.Seq[ByteRange]

Ranges returns an iterator for all byte ranges over the specified range and block size. Each range is inclusive of the 'From' byte and the 'To' byte. The ranges are generated in blocks of the specified size, with the last block potentially being smaller than the specified block size.

func ReserveSpace

func ReserveSpace(ctx context.Context, filename string, size int64, blockSize, concurrency int, progressCh chan<- int64) error

ReserveSpace creates a file with the specified filename and allocates the specified size bytes to it. It verifies that the file was created with the requested storage allocated. On systems that support space reservation, such as Linux, space is reserved accordingly, on others data is written to the file to ensure that the space is allocated. The intent is to ensure that a download operations never fails because of insufficient local space once it has been initiated. Progress can be reported via the progressCh channel, which will receive updates on the amount of space reserved. The channel will be closed when ReserveSpace returns. If concurrency is 0 or less, it defaults to the number of CPU cores available on the system.

Types

type ByteRange

type ByteRange struct {
	From int64 // Inclusive start of the range.
	To   int64 // Inclusive end of the range.
}

ByteRange represents a range of bytes in a file. The range is inclusive of the 'From' byte and the 'To' byte as per the HTTP Range header specification/convention.

func RangeForIndex

func RangeForIndex(index int, contentSize int64, blockSize int) ByteRange

RangeForIndex returns the byte range for the specified block index in a series of blocks of the specified size over the content size. The range is inclusive of the 'From' byte and the 'To' byte. If the index is out of bounds, it returns an invalid range with From and To set to -1.

func (ByteRange) Size

func (br ByteRange) Size() int64

func (ByteRange) String

func (br ByteRange) String() string

type ByteRanges

type ByteRanges struct {
	// contains filtered or unexported fields
}

ByteRanges represents a collection of equally sized (apart from the last range), contiguous, byte ranges that can be used to track which parts of a file have or have not been 'processed', e.g downloaded, cached, uploaded etc. The ranges are represented as a bitmap, where each bit corresponds to a block of bytes of the specified size. The bitmap is used to efficiently track which byte ranges are set (processed) and which are clear (not processed). ByteRanges also allows for the contiguous head of the byte ranges to be tracked asynchronously. ByteRanges is thread-safe and can be used concurrently by multiple goroutines.

func NewByteRanges

func NewByteRanges(contentSize int64, blockSize int) *ByteRanges

NewByteRanges creates a new ByteRanges instance with the specified content size and block size. The content size is the total size of the file in bytes, and the block size is the size of each byte range in bytes.

func (*ByteRanges) AllClear

func (br *ByteRanges) AllClear(start int) iter.Seq[ByteRange]

AllClear returns an iterator for all clear byte ranges starting from 'start'. A read lock is held while iterating over the byte ranges, hence calling any other method, such as Set, which takes a write lock will block until the iteration is complete. Use NextClear if finer-grained control is needed.

func (*ByteRanges) AllSet

func (br *ByteRanges) AllSet(start int) iter.Seq[ByteRange]

AllSet returns an iterator for all set byte ranges starting from 'start'. A read lock is held while iterating over the byte ranges, hence calling any other method, such as Set, which takes a write lock will block until the iteration is complete. Use NextSet if finer-grained control is needed.

func (ByteRanges) Block

func (br ByteRanges) Block(pos int64) int

Block returns the block index for the specified position. It returns -1 if the position is out of bounds.

func (ByteRanges) BlockSize

func (br ByteRanges) BlockSize() int

BlockSize returns the size of each block in bytes.

func (ByteRanges) ContentLength

func (br ByteRanges) ContentLength() int64

ContentLength returns the total size of the content in bytes.

func (*ByteRanges) IsClear

func (br *ByteRanges) IsClear(pos int64) bool

IsClear checks if the byte range for the specified position is clear.

func (*ByteRanges) IsSet

func (br *ByteRanges) IsSet(pos int64) bool

IsSet checks if the byte range for the specified position is set.

func (*ByteRanges) MarshalJSON

func (br *ByteRanges) MarshalJSON() ([]byte, error)

MarshalJSON implements the json.Marshaler interface for ByteRanges.

func (*ByteRanges) NextClear

func (br *ByteRanges) NextClear(start int, nbr *ByteRange) int

NextClear returns the next clear byte range starting from 'start'. It starts searching from the specified start index and returns the index of the next outstanding range which can be used to continue searching for the next outstanding range. The index will be -1 if there are no more outstanding ranges.

for start := NextClear(0, &br); start >= 0; start = NextClear(start, &br) {
    // Do something with the byte range br.
}

func (*ByteRanges) NextSet

func (br *ByteRanges) NextSet(start int, nbr *ByteRange) int

NextSet returns the next set byte range starting from 'start' and behaves similarly to NextClear.

func (*ByteRanges) Notify

func (br *ByteRanges) Notify() <-chan struct{}

Notify returns a channel that is closed when the contiguous byte ranges starting at the first byte (ie. 0) are extended.

func (ByteRanges) NumBlocks

func (br ByteRanges) NumBlocks() int

NumBlocks returns the number of blocks required to cover the byte ranges represented by this ByteRanges instance.

func (*ByteRanges) Set

func (br *ByteRanges) Set(pos int64)

Set marks the byte range for the specified position as set. It has no effect if the position is out of bounds.

func (*ByteRanges) Tail

func (br *ByteRanges) Tail() (ByteRange, bool)

Tail returns the contiguous byte ranage that starts at the first byte and extends to the last contiguous byte from the start that has been set. It returns false if no ranges have been set.

func (*ByteRanges) UnmarshalJSON

func (br *ByteRanges) UnmarshalJSON(data []byte) error

UnmarshalJSON implements the json.Unmarshaler interface for ByteRanges.

type ByteRangesTracker

type ByteRangesTracker struct {
	// contains filtered or unexported fields
}

ByteRangesTracker tracks byte ranges but is not thread safe and does not support tracking the contiguous head of the byte ranges.

func NewByteRangesTracker

func NewByteRangesTracker(contentSize int64, blockSize int) *ByteRangesTracker

NewByteRangesTracker creates a new ByteRangesTracker instance with the specified content size and block size.

func (ByteRangesTracker) Block

func (br ByteRangesTracker) Block(pos int64) int

Block returns the block index for the specified position. It returns -1 if the position is out of bounds.

func (ByteRangesTracker) BlockSize

func (br ByteRangesTracker) BlockSize() int

BlockSize returns the size of each block in bytes.

func (*ByteRangesTracker) Clear

func (br *ByteRangesTracker) Clear(pos int64)

Clear clears the byte range for the specified position.

func (ByteRangesTracker) ContentLength

func (br ByteRangesTracker) ContentLength() int64

ContentLength returns the total size of the content in bytes.

func (ByteRangesTracker) IsClear

func (br ByteRangesTracker) IsClear(pos int64) bool

IsClear checks if the byte range for the specified position is clear.

func (ByteRangesTracker) IsSet

func (br ByteRangesTracker) IsSet(pos int64) bool

IsSet checks if the byte range for the specified position is set.

func (*ByteRangesTracker) MarshalJSON

func (br *ByteRangesTracker) MarshalJSON() ([]byte, error)

func (ByteRangesTracker) NextClear

func (br ByteRangesTracker) NextClear(start int, nbr *ByteRange) int

NextClear returns the next clear byte range starting from 'start'. It starts searching from the specified start index and returns the index of the next outstanding range which can be used to continue searching for the next outstanding range. The index will be -1 if there are no more outstanding ranges.

for start := NextClear(0, &br); start >= 0; start = NextClear(start, &br) {
    // Do something with the byte range br.
}

func (ByteRangesTracker) NextSet

func (br ByteRangesTracker) NextSet(start int, nbr *ByteRange) int

NextSet returns the next set byte range starting from 'start' and behaves similarly to NextClear.

func (ByteRangesTracker) NumBlocks

func (br ByteRangesTracker) NumBlocks() int

NumBlocks returns the number of blocks required to cover the byte ranges represented by this ByteRanges instance.

func (*ByteRangesTracker) Set

func (br *ByteRangesTracker) Set(pos int64)

Set marks the byte range for the specified position as set. It has no effect if the position is out of bounds.

func (*ByteRangesTracker) UnmarshalJSON

func (br *ByteRangesTracker) UnmarshalJSON(data []byte) error

type CacheFileReadWriter

type CacheFileReadWriter interface {
	io.Reader
	io.Writer
	io.ReaderAt
	io.WriterAt
	io.Closer
	Name() string // Name returns the name of the file.
	Sync() error  // Sync ensures that all writes to the cache file are flushed to disk.
}

CacheFileReadWriter is an interface that combines the functionality of io.Reader, io.ReaderAt, io.WriterAt, and io.Closer for reading and writing to the cache and index files. It also includes a Sync method to ensure that all writes are flushed to disk.

type CachingDownloader

type CachingDownloader struct {
	// contains filtered or unexported fields
}

CachingDownloader is a downloader that caches streamed downloaded data to a local cache and supports resuming downloads from where they left off.

func NewCachingDownloader

func NewCachingDownloader(file Reader, cache DownloadCache, opts ...DownloadOption) (*CachingDownloader, error)

NewCachingDownloader creates a new CachingDownloader instance.

func (*CachingDownloader) Run

Run executes the downloaded process. If the downloader encounters any errors it will return an

type DownloadCache

type DownloadCache interface {
	// ContentLengthAndBlockSize returns the total length of the file in bytes
	// and the block size used for downloading the file.
	ContentLengthAndBlockSize() (int64, int)

	// CachedBytesAndBlocks returns the total number of bytes and blocks already stored in
	// the cache.
	CachedBytesAndBlocks() (bytes, blocks int64)

	// NextOutstanding finds the next byte range that has not been cached
	// starting from the specified 'start' index. Its return value is either
	// -1 if there are no more outstanding ranges, or the value of the next
	// starting index to continue searching at.
	// To iterate over all outstanding ranges, call this method repeatedly
	// until it returns -1 as follows:
	//    for start := NextOutstanding(0, &br); start != -1; start = NextOutstanding(start, &br) {
	//        // Do something with the byte range br.
	//    }
	NextOutstanding(start int, br *ByteRange) int

	// NextCached finds the next byte range that has been cached in the same manner
	// as NextOutstanding.
	NextCached(start int, br *ByteRange) int

	// Tail returns the contiguous range of bytes that have been cached so far.
	// If this has not grown since the last call to Tail, Tail will block until
	// the tail is extended. If the context is done before the tail is extended,
	// it returns a ByteRange with From and To set to -1.
	Tail(context.Context) ByteRange

	// Complete returns true if all byte ranges have been cached.
	Complete() bool

	// WriteAt writes at most blocksize bytes starting at the specified offset.
	// Offset must be aligned with the block boundaries. It returns an error if
	// data is not exactly blocksize bytes long, unless the offset is at the
	// end of the file, in which case it must be (content length % blocksize)
	// bytes long.
	WriteAt(data []byte, off int64) (int, error)

	// ReadAt reads at most len(data) bytes starting at the specified offset.
	// It returns an error if any of the data to be read is not already cached.
	// The offset need not be aligned with the block boundaries.
	ReadAt(data []byte, off int64) (int, error)
}

DownloadCache is an interface for caching byte ranges of large files to support resumable downloads.

type DownloadOption

type DownloadOption func(*downloadOptions)

func WithDownloadConcurrency

func WithDownloadConcurrency(n int) DownloadOption

WithDownloadConcurrency sets the number of concurrent download goroutines.

func WithDownloadDigest

func WithDownloadDigest(h digests.Hash) DownloadOption

WithDownloadDigest sets the hash function to be used for computing the digest of the downloaded data as it is streamed.

func WithDownloadLogger

func WithDownloadLogger(logger *slog.Logger) DownloadOption

WithDownloadLogger sets the logger for the download.

func WithDownloadProgress

func WithDownloadProgress(progress chan<- DownloadStats) DownloadOption

WithDownloadProgress sets the channel to report download progress.

func WithDownloadProgressTimeout

func WithDownloadProgressTimeout(timeout time.Duration) DownloadOption

WithDownloadProgressDelay sets a timeout for certain progress updates, such as those sent when a download is completed to allow for the caller to process any pending updates. Routine updates, such as those sent whenever a new byte range is downloaded are sent on a best-effort basis.

func WithDownloadRateController

func WithDownloadRateController(rc ratecontrol.Limiter) DownloadOption

WithDownloadRateController sets the rate controller for the download.

func WithDownloadWaitForCompletion

func WithDownloadWaitForCompletion(wait bool) DownloadOption

WithDownloadWaitForCompletion sets whether the download should iterate, until the download is successfully completed, or return after one iteration. An iteration represents a single pass through the download process whereby every outstsanding byte range is attempted to be downloaded once with retries. A download will either complete after any specified retries or be left outstanding for the next iteration.

type DownloadStats

type DownloadStats struct {
	CachedOrStreamedBytes  int64 // Total bytes cached or streamed.
	CachedOrStreamedBlocks int64 // Total blocks cached or streamed.
	CacheErrors            int64 // Total number of errors encountered while caching.
	DownloadedBytes        int64 // Total bytes downloaded so far.
	DownloadedBlocks       int64 // Total blocks downloaded so far.
	DownloadSize           int64 // Total size of the file in bytes.
	DownloadBlocks         int64 // Total number of blocks to download.
	DownloadRetries        int64 // Total number of retries made during the download.
	DownloadErrors         int64 // Total number of errors encountered during the download.
	Iterations             int64 // Number of iterations requiredd to complete the download.
}

DownloadStats statistics on the download process and is used for progress reporting.

type DownloadStatus

type DownloadStatus struct {
	DownloadStats
	Resumable bool          // Indicates if the download can be re-run.
	Complete  bool          // Indicates if the download completed successfully.
	Duration  time.Duration // Total duration of the download.
}

DownloadStatus holds the status for a completed download operation, including the progress made, whether the download is resumable, completed and the total duration of operation.

type LocalDownloadCache

type LocalDownloadCache struct {
	// contains filtered or unexported fields
}

LocalDownloadCache is a concrete implementation of RangeCache that uses a local file to cache byte ranges of large files. It allows for concurrent access.

func NewLocalDownloadCache

func NewLocalDownloadCache(dataReadWriter, indexReadWriter CacheFileReadWriter) (*LocalDownloadCache, error)

NewLocalDownloadCache creates a new LocalDownloadCache instance. It opens the cache file and loads the index file containing the byte ranges that have been written to the cache. It returns an error if the files cannot be opened or if the index file cannot be loaded. The cache file is used to store the actual data, and the index file is used to track which byte ranges have been written to the cache. The cache and index files must already exist and are expected to be have been created using NewFilesForCache.

func (*LocalDownloadCache) CachedBytesAndBlocks

func (c *LocalDownloadCache) CachedBytesAndBlocks() (bytes, blocks int64)

func (*LocalDownloadCache) Close

func (c *LocalDownloadCache) Close() error

Close implements DownloadCache.

func (*LocalDownloadCache) Complete

func (c *LocalDownloadCache) Complete() bool

Complete implements DownloadCache. It returns true if all byte ranges have been cached, meaning there are no more uncached ranges.

func (*LocalDownloadCache) ContentLengthAndBlockSize

func (c *LocalDownloadCache) ContentLengthAndBlockSize() (int64, int)

ContentLengthAndBlockSize implements DownloadCache.

func (*LocalDownloadCache) NextCached

func (c *LocalDownloadCache) NextCached(start int, br *ByteRange) int

NextCached implements DownloadCache. It returns the next, if any, cached byte range starting from the specified index.

func (*LocalDownloadCache) NextOutstanding

func (c *LocalDownloadCache) NextOutstanding(start int, br *ByteRange) int

NextOutstanding implements DownloadCache. It returns the next, if any, uncached byte range starting from the specified index.

func (*LocalDownloadCache) ReadAt

func (c *LocalDownloadCache) ReadAt(data []byte, off int64) (int, error)

ReadAt implements DownloadCache.

func (*LocalDownloadCache) Tail

Tail implements DownloadCache. It returns the contiguous range of bytes that have been cached so far. If this has not grown since the last call to Tail, Tail will block until the tail is extended.

func (*LocalDownloadCache) WriteAt

func (c *LocalDownloadCache) WriteAt(data []byte, off int64) (int, error)

WriteAt implements DownloadCache.

type Reader

type Reader interface {
	Name() string // Name returns the name of the file being read.

	// ContentLengthAndBlockSize returns the total length of the file in bytes
	// and the preferred block size used for downloading the file.
	ContentLengthAndBlockSize() (int64, int)

	// Digest returns an instance of digests.Hash that can be used to
	// compute and validate the digest of the file.
	// If the file does not have a digest digest.IsSet() will return false.
	Digest() digests.Hash

	// GetReader retrieves a byte range from the file and returns
	// a reader that can be used to access that data range. In addition to the
	// error, the RetryResponse is returned which indicates whether the
	// operation can be retried and the duration to wait before retrying.
	GetReader(ctx context.Context, from, to int64) (io.ReadCloser, RetryResponse, error)
}

Reader provides support for downloading very large files efficiently concurrently and to allow for resumption of partial downloads.

type RetryResponse

type RetryResponse interface {
	// IsRetryable checks if the error is retryable.
	IsRetryable() bool

	// BackoffDuration returns true if a specific backoff duration is specified
	// in the response, in which case the duration is returned. If false
	// no specific backoff duration is requested and the backoff algorithm
	// should fallback to something appropriate, such as exponential backoff.
	BackoffDuration() (bool, time.Duration)
}

RetryResponse allows the caller to determine whether an operation that failed with a retryable error can be retried and how long to wait before retrying the operation.

type StreamingDownloader

type StreamingDownloader struct {
	// contains filtered or unexported fields
}

StreamingDownloader is a downloader that streams data from a large file. The downloader uses concurrent byte range requests to fetch data and then serializes the responses into a single stream for reading.

func NewStreamingDownloader

func NewStreamingDownloader(file Reader, opts ...DownloadOption) *StreamingDownloader

NewStreamingDownloader creates a new StreamingDownloader instance.

func (*StreamingDownloader) ContentLength

func (dl *StreamingDownloader) ContentLength() int64

ContentLength returns the content length header for the file being downloaded.

func (*StreamingDownloader) Read

func (dl *StreamingDownloader) Read(buf []byte) (int, error)

Read implements io.Reader.

func (*StreamingDownloader) Reader

func (dl *StreamingDownloader) Reader() io.Reader

Reader returns an io.Reader.

func (*StreamingDownloader) Run

type StreamingStatus

type StreamingStatus struct {
	DownloadStats
	OutOfOrder    int64         // Total number of out-of-order responses encountered.
	MaxOutOfOrder int64         // Maximum number of out-of-order responses at any point.
	Duration      time.Duration // Total duration of the download.
}

StreamingStatus holds status for a streaming download.

type Uploader

type Uploader struct{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL