Documentation
¶
Index ¶
- Variables
- func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse, ...) (string, error)
- func ByteCountSI(b int64) string
- func CanDisableProxy() bool
- func ContextWithRequestId(ctx context.Context, id string) context.Context
- func CreateSharingUrl(ctx context.Context, objectUrl *url.URL, isWrite bool) (string, error)
- func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error)
- func DoDelete(ctx context.Context, remoteDestination string, recursive bool, ...) (err error)
- func DoEvict(ctx context.Context, remoteObject string, immediate bool, ...) (message string, err error)
- func DoShadowIngest(ctx context.Context, sourceFile string, originPrefix string, ...) (int64, string, error)
- func GetBehavior(behaviorName string) (packerBehavior, error)
- func GetDirectorInfoForPath(ctx context.Context, pUrl *pelican_url.PelicanURL, httpMethod string, ...) (parsedResponse server_structs.DirectorResponse, err error)
- func GetObjectServerHostnames(ctx context.Context, testFile string) (urls []string, err error)
- func HttpDigestFromChecksum(checksumType ChecksumType) string
- func IsRetryable(err error) bool
- func KnownChecksumTypesAsHttpDigest() (result []string)
- func NewTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.DirectorResponse, ...) *tokenGenerator
- func ParseDirectorInfo(dirResp *http.Response) (server_structs.DirectorResponse, error)
- func ParseRemoteAsPUrl(ctx context.Context, rp string) (*pelican_url.PelicanURL, error)
- func RequestIdFromContext(ctx context.Context) (string, bool)
- func ResetJobAd()
- func ShouldRetry(err error) bool
- func WithDirectorDebug(ctx context.Context) context.Context
- type ByteRange
- type ChecksumInfo
- type ChecksumMismatchError
- type ChecksumType
- type ConnectionSetupError
- type ConstantSizer
- type DirRespCache
- func (c *DirRespCache) Invalidate(prefix string)
- func (c *DirRespCache) InvalidateAll()
- func (c *DirRespCache) Len() int
- func (c *DirRespCache) Lookup(objectPath string) (server_structs.DirectorResponse, bool)
- func (c *DirRespCache) LookupOrLoad(ctx context.Context, objectPath string, loader DirRespLoader) (server_structs.DirectorResponse, error)
- func (c *DirRespCache) Store(prefix string, objectPath string, resp server_structs.DirectorResponse)
- type DirRespLoader
- type FileInfo
- type HeaderTimeoutError
- type HttpErrResp
- type InvalidByteInChunkLengthError
- type NetworkResetError
- type PelicanFS
- type PelicanFile
- func (pf *PelicanFile) Close() error
- func (pf *PelicanFile) Read(p []byte) (n int, err error)
- func (pf *PelicanFile) ReadAt(p []byte, off int64) (n int, err error)
- func (pf *PelicanFile) ReadDir(n int) ([]fs.DirEntry, error)
- func (pf *PelicanFile) Seek(offset int64, whence int) (int64, error)
- func (pf *PelicanFile) Stat() (fs.FileInfo, error)
- func (pf *PelicanFile) Write(p []byte) (n int, err error)
- type PermissionDeniedError
- type ServerPriority
- type Sizer
- type SlowTransferError
- type StatusCodeError
- type StoppedTransferError
- type SyncLevel
- type TimestampedError
- type TokenProvider
- type TransferAttemptError
- type TransferCallbackFunc
- type TransferClient
- func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error)
- func (tc *TransferClient) Cancel()
- func (tc *TransferClient) Close()
- func (tc *TransferClient) NewCopyJob(ctx context.Context, src *url.URL, dest *url.URL, recursive bool, ...) (tj *TransferJob, err error)
- func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error)
- func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, ...) (tj *TransferJob, err error)
- func (tc *TransferClient) Results() chan TransferResults
- func (tc *TransferClient) Shutdown() (results []TransferResults, err error)
- func (tc *TransferClient) Submit(tj *TransferJob) error
- type TransferEngine
- type TransferErrors
- type TransferJob
- type TransferMetadata
- type TransferOption
- func WithAcquireToken(enable bool) TransferOption
- func WithByteRange(start, end int64) TransferOption
- func WithCacheEmbeddedClientMode() TransferOption
- func WithCaches(caches ...*url.URL) TransferOption
- func WithCallback(callback TransferCallbackFunc) TransferOption
- func WithCollectionsUrl(url string) TransferOption
- func WithDepth(depth int) TransferOption
- func WithDestinationAcquireToken(enable bool) TransferOption
- func WithDestinationToken(token string) TransferOption
- func WithDestinationTokenLocation(location string) TransferOption
- func WithDryRun(enable bool) TransferOption
- func WithFedToken(provider TokenProvider) TransferOption
- func WithForcePrestageAPI(force bool) TransferOption
- func WithInPlace(inPlace bool) TransferOption
- func WithMetadataChannel(ch chan<- TransferMetadata) TransferOption
- func WithReader(reader io.ReadCloser) TransferOption
- func WithRecursive(recursive bool) TransferOption
- func WithRequestChecksums(types []ChecksumType) TransferOption
- func WithRequestId(id string) TransferOption
- func WithRequireChecksum() TransferOption
- func WithSourceAcquireToken(enable bool) TransferOption
- func WithSourceToken(token string) TransferOption
- func WithSourceTokenLocation(location string) TransferOption
- func WithSynchronize(level SyncLevel) TransferOption
- func WithToken(token string) TransferOption
- func WithTokenLocation(location string) TransferOption
- func WithWriter(writer io.WriteCloser) TransferOption
- type TransferResult
- type TransferResults
- func DoCopy(ctx context.Context, sourceFile string, destination string, recursive bool, ...) (transferResults []TransferResults, err error)
- func DoGet(ctx context.Context, remoteObject string, localDestination string, ...) (transferResults []TransferResults, err error)
- func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error)
- func DoPut(ctx context.Context, localObject string, remoteDestination string, ...) (transferResults []TransferResults, err error)
- type UnexpectedEOFError
Constants ¶
This section is empty.
Variables ¶
var ( PelicanError error_codes.PelicanError // Indicates the origin responded too slowly after the cache tried to download from it CacheTimedOutReadingFromOrigin = errors.New("cache timed out waiting on origin") // ErrObjectNotFound is returned when the requested remote object does not exist. ErrObjectNotFound = errors.New("remote object not found") )
var (
ErrServerChecksumMissing = errors.New("no checksum information was returned by server but checksums were required by the client")
)
var ObjectServersToTry int = 3
Number of caches to attempt to use in any invocation
Functions ¶
func AcquireToken ¶
func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse, opts config.TokenGenerationOpts) (string, error)
Given a URL and a director Response, attempt to acquire a valid token for that URL.
func ByteCountSI ¶
Convert b bytes to a human-friendly string with SI units
For example, ByteCountSI(2000) returns "2 KB"
func CanDisableProxy ¶
func CanDisableProxy() bool
Determine whether we are allowed to skip the proxy as a fallback
func ContextWithRequestId ¶
ContextWithRequestId returns a child context that carries the given request ID. Downstream code can retrieve it with RequestIdFromContext.
func CreateSharingUrl ¶
func DoCacheInfo ¶
func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error)
Check the cache information of a remote cache
func DoDelete ¶
func DoDelete(ctx context.Context, remoteDestination string, recursive bool, options ...TransferOption) (err error)
DoDelete queries the director using the DELETE HTTP method, retrieves the token, and initializes the delete operation.
func DoEvict ¶
func DoEvict(ctx context.Context, remoteObject string, immediate bool, options ...TransferOption) (message string, err error)
DoEvict evicts cached objects matching the given path (or prefix) from the local cache. Token bootstrapping follows the same logic as DoGet: tokens are discovered from the environment, credential files, or negotiated via OAuth when needed.
The remoteObject argument is a pelican:// or osdf:// URL (or a schemeless namespace path when federation discovery is configured).
When immediate is true the objects are deleted right away; otherwise they are marked for priority eviction (purge-first).
func DoShadowIngest ¶
func GetBehavior ¶
func GetDirectorInfoForPath ¶
func GetDirectorInfoForPath(ctx context.Context, pUrl *pelican_url.PelicanURL, httpMethod string, token string) (parsedResponse server_structs.DirectorResponse, err error)
Retrieve federation namespace information for a given URL.
This is the public API; it always queries the director's default endpoint. Internal callers that need embedded cache-mode behaviour should use getDirectorInfoForPath instead.
func HttpDigestFromChecksum ¶
func HttpDigestFromChecksum(checksumType ChecksumType) string
func IsRetryable ¶
IsRetryable will return true if the error is retryable
func KnownChecksumTypesAsHttpDigest ¶
func KnownChecksumTypesAsHttpDigest() (result []string)
List all the checksum types known as HTTP digest strings
func NewTokenGenerator ¶
func NewTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.DirectorResponse, operation config.TokenOperation, enableAcquire bool) *tokenGenerator
NewTokenGenerator creates a token generator for the given destination and operation. This is the exported entry point used by cmd/token.go; most internal callers should use the unexported newTokenGenerator.
func ParseDirectorInfo ¶
func ParseDirectorInfo(dirResp *http.Response) (server_structs.DirectorResponse, error)
Given the Director response, parse the headers and construct the ordered list of object servers.
func ParseRemoteAsPUrl ¶
func ParseRemoteAsPUrl(ctx context.Context, rp string) (*pelican_url.PelicanURL, error)
Given a remote path, use the client's wisdom to parse it as a Pelican URL, including metadata discovery.
This will handle setting up the URL cache, passing along contexts to discovery, and passing the client context/user agent. Calling this should return a fully populated PelicanURL object, including any metadata that was discovered.
func RequestIdFromContext ¶
RequestIdFromContext extracts a request ID previously stored with ContextWithRequestId. Returns ("", false) if none is set.
func ResetJobAd ¶
func ResetJobAd()
Reset the memory-cached copy of the HTCondor job ad
The client will search through the process's environment to find a HTCondor "job ad" and cache its contents in memory; the job ad is used to determine the project name and job ID for the transfer headers.
This function is used to reset the job ad and is intended for use in unit tests that need to reset things from outside the cache package.
func ShouldRetry ¶
Types ¶
type ByteRange ¶
ByteRange specifies a byte range for partial object transfers Start and End are inclusive byte offsets (0-indexed) End of -1 means "to end of file"
type ChecksumInfo ¶
type ChecksumInfo struct {
Algorithm ChecksumType
Value []byte
}
Value of one checksum calculation
type ChecksumMismatchError ¶
type ChecksumMismatchError struct {
Info ChecksumInfo // The checksum that was calculated by the client
ServerValue []byte // The checksum value that was calculated by the server
}
Represents a mismatched checksum
func (*ChecksumMismatchError) Error ¶
func (e *ChecksumMismatchError) Error() string
type ChecksumType ¶
type ChecksumType int
const ( // The checksum algorithms supported by the client // // Note we have a helper function, KnownChecksumTypes, that returns a list // of all the elements enumerated below; do not skip integers in this list // or that functionality will break. // AlgMD5 ChecksumType = iota // Checksum is using the MD5 algorithm AlgCRC32C // Checksum is using the CRC32C algorithm AlgCRC32 // Checksum is using the CRC32 algorithm AlgSHA1 // Checksum is using the SHA-1 algorithm AlgUnknown // Unknown checksum algorithm. Always a "trailer" indicating the last known algorithm. AlgDefault = AlgCRC32C // Default checksum algorithm is CRC32C if the client doesn't specify one. )
func ChecksumFromHttpDigest ¶
func ChecksumFromHttpDigest(httpDigest string) ChecksumType
func KnownChecksumTypes ¶
func KnownChecksumTypes() (result []ChecksumType)
List all the checksum types known to the client
type ConnectionSetupError ¶
ConnectionSetupError is an error that is returned when a connection to the remote server fails
func (*ConnectionSetupError) Error ¶
func (e *ConnectionSetupError) Error() string
func (*ConnectionSetupError) Is ¶
func (e *ConnectionSetupError) Is(target error) bool
func (*ConnectionSetupError) Unwrap ¶
func (e *ConnectionSetupError) Unwrap() error
type ConstantSizer ¶
type ConstantSizer struct {
// contains filtered or unexported fields
}
func (*ConstantSizer) BytesComplete ¶
func (cs *ConstantSizer) BytesComplete() int64
func (*ConstantSizer) Size ¶
func (cs *ConstantSizer) Size() int64
type DirRespCache ¶
type DirRespCache struct {
// contains filtered or unexported fields
}
DirRespCache caches DirectorResponse values keyed by namespace prefix.
It supports longest-prefix matching: given a path like "/federation/data/subdir/file.txt", it will match an entry stored under the prefix "/federation/data" (but not "/federation/other").
Concurrent cache misses for paths that would map to the same singleflight key are coalesced: only one director query is issued and all waiters receive the same result.
Entries expire after a configurable TTL. The cache is safe for concurrent use.
func NewDirRespCache ¶
func NewDirRespCache(ttl time.Duration) *DirRespCache
NewDirRespCache creates a new prefix-matching cache for director responses. Entries are considered valid for `ttl` after they are stored.
func (*DirRespCache) Invalidate ¶
func (c *DirRespCache) Invalidate(prefix string)
Invalidate removes the entry for the given prefix.
func (*DirRespCache) InvalidateAll ¶
func (c *DirRespCache) InvalidateAll()
InvalidateAll removes all cached entries.
func (*DirRespCache) Len ¶
func (c *DirRespCache) Len() int
Len returns the number of entries in the cache (including expired ones that haven't been cleaned up yet).
func (*DirRespCache) Lookup ¶
func (c *DirRespCache) Lookup(objectPath string) (server_structs.DirectorResponse, bool)
Lookup finds the longest cached prefix that matches `objectPath`.
For example, if the cache contains entries for "/a/b" and "/a", a lookup for "/a/b/c/d.txt" will return the entry for "/a/b".
Returns the cached DirectorResponse and true if a valid (non-expired) entry was found, or the zero value and false otherwise.
func (*DirRespCache) LookupOrLoad ¶
func (c *DirRespCache) LookupOrLoad(ctx context.Context, objectPath string, loader DirRespLoader) (server_structs.DirectorResponse, error)
LookupOrLoad checks the cache first; on a miss it calls `loader` exactly once per unique objectPath, coalescing concurrent callers via singleflight.
If the context is cancelled while waiting for an in-flight query, the waiter returns ctx.Err() immediately. The underlying query keeps running so that other waiters (with live contexts) still receive the result.
On success the response is automatically stored in the cache under the prefix returned by the loader.
func (*DirRespCache) Store ¶
func (c *DirRespCache) Store(prefix string, objectPath string, resp server_structs.DirectorResponse)
Store saves a DirectorResponse under the given prefix. Any previous entry for the same prefix is replaced.
objectPath is the federation object path (e.g. "/test/file.txt") that was used to obtain this response from the director. It is stripped from each ObjectServer URL so the cached entry contains only the server-side base path. Pass "" if no stripping is needed.
type DirRespLoader ¶
type DirRespLoader func(ctx context.Context) (resp server_structs.DirectorResponse, prefix string, err error)
DirRespLoader is a function that queries the director for a given object path. It returns the DirectorResponse and the namespace prefix that should be used as the cache key.
type FileInfo ¶
type FileInfo struct {
Name string
Size int64
ModTime time.Time
IsCollection bool
ETag string `json:"etag,omitempty"` // HTTP ETag header value
Checksums map[string]string `json:"checksums,omitempty"` // Checksum type (HTTP digest name) to hex-encoded value
}
Our own FileInfo structure to hold information about a file NOTE: this was created to provide more flexibility to information on a file. The fs.FileInfo interface was causing some issues like not always returning a Name attribute ALSO NOTE: the fields are exported so they can be marshalled into JSON, it does not work otherwise
type HeaderTimeoutError ¶
type HeaderTimeoutError struct{}
func (*HeaderTimeoutError) Error ¶
func (e *HeaderTimeoutError) Error() string
func (*HeaderTimeoutError) Is ¶
func (e *HeaderTimeoutError) Is(target error) bool
type HttpErrResp ¶
func (*HttpErrResp) Error ¶
func (e *HttpErrResp) Error() string
func (*HttpErrResp) Unwrap ¶
func (e *HttpErrResp) Unwrap() error
type InvalidByteInChunkLengthError ¶
type InvalidByteInChunkLengthError struct {
Err error
}
func (*InvalidByteInChunkLengthError) Error ¶
func (e *InvalidByteInChunkLengthError) Error() string
func (*InvalidByteInChunkLengthError) Is ¶
func (e *InvalidByteInChunkLengthError) Is(target error) bool
func (*InvalidByteInChunkLengthError) Unwrap ¶
func (e *InvalidByteInChunkLengthError) Unwrap() error
type NetworkResetError ¶
type NetworkResetError struct{}
func (*NetworkResetError) Error ¶
func (e *NetworkResetError) Error() string
type PelicanFS ¶
type PelicanFS struct {
// contains filtered or unexported fields
}
PelicanFS implements io.FS for the Pelican data federation. It provides a filesystem-like interface to objects stored in the federation.
func NewPelicanFS ¶
func NewPelicanFS(ctx context.Context, options ...TransferOption) *PelicanFS
NewPelicanFS creates a new filesystem interface to the Pelican federation. The provided context is used for all operations, and the options are applied to all transfers. If urlPrefix is empty or "/", it defaults to "osdf:///".
func NewPelicanFSWithPrefix ¶
func NewPelicanFSWithPrefix(ctx context.Context, urlPrefix string, options ...TransferOption) *PelicanFS
NewPelicanFSWithPrefix creates a new filesystem interface with a URL prefix. All paths will be relative to this prefix. If prefix is empty or "/", defaults to "osdf:///".
type PelicanFile ¶
type PelicanFile struct {
// contains filtered or unexported fields
}
PelicanFile represents an open file in the Pelican federation. It implements fs.File, io.ReaderAt, io.Seeker, io.Writer, and fs.ReadDirFile.
Thread-safety: Most fields that can change during the file's lifetime are protected by mu. The currentEndpoint field uses atomic operations for lock-free access during range reads. Functions with "Locked" suffix must be called with mu held.
func (*PelicanFile) Close ¶
func (pf *PelicanFile) Close() error
Close closes the file, rendering it unusable for I/O. It implements fs.File.
func (*PelicanFile) Read ¶
func (pf *PelicanFile) Read(p []byte) (n int, err error)
Read reads up to len(p) bytes into p.
func (*PelicanFile) ReadAt ¶
func (pf *PelicanFile) ReadAt(p []byte, off int64) (n int, err error)
ReadAt reads len(p) bytes into p starting at offset off in the file. It implements io.ReaderAt. Note: ReadAt does not affect the file position.
func (*PelicanFile) ReadDir ¶
func (pf *PelicanFile) ReadDir(n int) ([]fs.DirEntry, error)
ReadDir reads the contents of the directory and returns a slice of DirEntry values. It implements fs.ReadDirFile. Can be called multiple times to paginate through entries.
func (*PelicanFile) Seek ¶
func (pf *PelicanFile) Seek(offset int64, whence int) (int64, error)
Seek sets the offset for the next Read operation and returns the new offset. It implements io.Seeker.
type PermissionDeniedError ¶
type PermissionDeniedError struct {
// contains filtered or unexported fields
}
PermissionDeniedError is returned when a 403 status code is received. The message is generated based on the token's validity.
func (*PermissionDeniedError) Error ¶
func (e *PermissionDeniedError) Error() string
type ServerPriority ¶
type SlowTransferError ¶
type SlowTransferError struct {
BytesTransferred int64
BytesPerSecond int64
BytesTotal int64
Duration time.Duration
CacheAge time.Duration
}
SlowTransferError is an error that is returned when a transfer takes longer than the configured timeout
func (*SlowTransferError) Error ¶
func (e *SlowTransferError) Error() (errMsg string)
func (*SlowTransferError) Is ¶
func (e *SlowTransferError) Is(target error) bool
type StatusCodeError ¶
type StatusCodeError int
StatusCodeError indicates the server returned a non-200 code.
The wrapper is done to provide a Pelican-based error hierarchy in case we ever decide to have a different underlying download package.
func (*StatusCodeError) Error ¶
func (e *StatusCodeError) Error() string
func (*StatusCodeError) Is ¶
func (e *StatusCodeError) Is(target error) bool
type StoppedTransferError ¶
type StoppedTransferError struct {
BytesTransferred int64
StoppedTime time.Duration
CacheHit bool
Upload bool
}
Error type for when the transfer started to return data then completely stopped
func (*StoppedTransferError) Error ¶
func (e *StoppedTransferError) Error() (errMsg string)
func (*StoppedTransferError) Is ¶
func (e *StoppedTransferError) Is(target error) bool
type TimestampedError ¶
type TimestampedError struct {
// contains filtered or unexported fields
}
func (*TimestampedError) Error ¶
func (te *TimestampedError) Error() string
func (*TimestampedError) Unwrap ¶
func (te *TimestampedError) Unwrap() error
type TokenProvider ¶
TokenProvider returns a token value, refreshing as needed. Implementations must be safe for concurrent use. See tokenGenerator for the standard implementation.
func StaticTokenProvider ¶
func StaticTokenProvider(token string) TokenProvider
StaticTokenProvider returns a TokenProvider that always yields the given token string. Useful when the caller has a fixed token and does not need refresh logic.
type TransferAttemptError ¶
type TransferAttemptError struct {
// contains filtered or unexported fields
}
Transfer attempt error wraps an error with information about the service/proxy used
func (*TransferAttemptError) Error ¶
func (tae *TransferAttemptError) Error() (errMsg string)
func (*TransferAttemptError) Is ¶
func (tae *TransferAttemptError) Is(target error) bool
func (*TransferAttemptError) Unwrap ¶
func (tae *TransferAttemptError) Unwrap() error
type TransferCallbackFunc ¶
type TransferClient ¶
type TransferClient struct {
// contains filtered or unexported fields
}
A client to the transfer engine.
func (*TransferClient) CacheInfo ¶
func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error)
cacheInfo retrieves and returns the age and size of the specified object.
func (*TransferClient) Cancel ¶
func (tc *TransferClient) Cancel()
Cancel a client
When cancelled, all channels and goroutines associated with the client will close/exit immediately.
func (*TransferClient) Close ¶
func (tc *TransferClient) Close()
Close the transfer client object
Any subsequent job submissions will cause a panic
func (*TransferClient) NewCopyJob ¶
func (tc *TransferClient) NewCopyJob(ctx context.Context, src *url.URL, dest *url.URL, recursive bool, options ...TransferOption) (tj *TransferJob, err error)
Create a new third-party copy job for the client.
This creates a transfer that uses the HTTP COPY verb to instruct the destination server to pull data directly from the source, without the client acting as an intermediary.
The returned object can be further customized as desired. This function does not "submit" the job for execution.
func (*TransferClient) NewPrestageJob ¶
func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error)
Create a new prestage job for the client
The returned object can be further customized as desired. This function does not "submit" the job for execution.
func (*TransferClient) NewTransferJob ¶
func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error)
Create a new transfer job for the client
The returned object can be further customized as desired. This function does not "submit" the job for execution.
func (*TransferClient) Results ¶
func (tc *TransferClient) Results() chan TransferResults
Return a channel containing the results from the client
func (*TransferClient) Shutdown ¶
func (tc *TransferClient) Shutdown() (results []TransferResults, err error)
Shutdown the transfer client
Closes the client and waits for all jobs to exit cleanly. Returns any results that were pending when Shutdown was called
func (*TransferClient) Submit ¶
func (tc *TransferClient) Submit(tj *TransferJob) error
Submit the transfer job to the client for processing
type TransferEngine ¶
type TransferEngine struct {
// contains filtered or unexported fields
}
An object able to process transfer jobs.
func NewTransferEngine ¶
func NewTransferEngine(ctx context.Context) (te *TransferEngine, err error)
Returns a new transfer engine object whose lifetime is tied to the provided context. Will launcher worker goroutines to handle the underlying transfers
func (*TransferEngine) Close ¶
func (te *TransferEngine) Close()
Closes the TransferEngine. No new work may be submitted. Any ongoing work will continue
func (*TransferEngine) NewClient ¶
func (te *TransferEngine) NewClient(options ...TransferOption) (client *TransferClient, err error)
Create a new client to work with an engine
func (*TransferEngine) Shutdown ¶
func (te *TransferEngine) Shutdown() error
Initiates a shutdown of the transfer engine. Waits until all workers have finished
type TransferErrors ¶
type TransferErrors struct {
// contains filtered or unexported fields
}
A container object for multiple sub-errors representing transfer failures.
func NewTransferErrors ¶
func NewTransferErrors() *TransferErrors
Create a new transfer error object
func (*TransferErrors) AddError ¶
func (te *TransferErrors) AddError(err error)
func (*TransferErrors) AddPastError ¶
func (te *TransferErrors) AddPastError(err error, timestamp time.Time)
func (*TransferErrors) AllErrorsRetryable ¶
func (te *TransferErrors) AllErrorsRetryable() bool
Returns true if all errors are retryable. If no errors are present, then returns true
func (*TransferErrors) Error ¶
func (te *TransferErrors) Error() string
func (*TransferErrors) Unwrap ¶
func (te *TransferErrors) Unwrap() []error
func (*TransferErrors) UserError ¶
func (te *TransferErrors) UserError() string
Return a more refined, user-friendly error string
type TransferJob ¶
type TransferJob struct {
// contains filtered or unexported fields
}
A representation of a "transfer job". The job can be submitted to the client library, resulting in one or more transfers (if recursive is true). We assume the transfer job is potentially queued for a long time and all the transfers generated by this job will use the same namespace and token.
func (*TransferJob) GetLookupStatus ¶
func (tj *TransferJob) GetLookupStatus() (ok bool, err error)
Returns the status of the transfer job-to-file(s) lookup
ok is true if the lookup has completed.
type TransferMetadata ¶
type TransferMetadata struct {
ContentLength int64 // Size of the HTTP response body (range length for 206 Partial Content; full object for 200 OK)
ObjectSize int64 // Full object size (-1 if unknown; parsed from Content-Range for 206, same as ContentLength for 200)
ETag string // ETag header from response
LastModified time.Time // Last-Modified header from response
ContentType string // Content-Type header from response
CacheControl string // Cache-Control header from response
}
TransferMetadata contains early metadata about a transfer received from the server before the data transfer begins. This allows making decisions (e.g., ETag verification) before committing to the full transfer.
type TransferOption ¶
func WithAcquireToken ¶
func WithAcquireToken(enable bool) TransferOption
Create an option to specify the token acquisition logic
Token acquisition (e.g., using OAuth2 to get a token when one isn't found in the environment) defaults to `true` but can be disabled with this options
func WithByteRange ¶
func WithByteRange(start, end int64) TransferOption
Create an option to specify a byte range for partial object downloads
The start and end parameters are inclusive byte offsets (0-indexed). Use end=-1 to download from start to the end of the file. Example: WithByteRange(0, 1023) downloads the first 1024 bytes. Example: WithByteRange(1024, -1) downloads from byte 1024 to the end.
func WithCacheEmbeddedClientMode ¶
func WithCacheEmbeddedClientMode() TransferOption
WithCacheEmbeddedClientMode sets the client into "cache-embedded" mode. In this mode, the client queries the director's origin endpoint (/api/v1.0/director/origin/…) instead of the default shortcut endpoint. This causes the director to redirect to origins rather than to caches, which is the correct behaviour when the transfer client is itself embedded inside a cache process.
Without this option, a GET for /test/file.txt is routed through the director's shortcut middleware, which redirects to a cache. With this option, the same GET is explicitly routed to the origin endpoint so the cache can fetch from the origin.
func WithCaches ¶
func WithCaches(caches ...*url.URL) TransferOption
Create an option to override the cache list
func WithCallback ¶
func WithCallback(callback TransferCallbackFunc) TransferOption
Create an option that provides a callback for a TransferClient
The callback is invoked periodically by one of the transfer workers, with inputs of the local path (e.g., source on upload), the current bytes transferred, and the total object size
func WithCollectionsUrl ¶
func WithCollectionsUrl(url string) TransferOption
Override collections URL to be used by the TransferClient
func WithDepth ¶
func WithDepth(depth int) TransferOption
Create an option to specify the maximum depth for recursive listing
The depth parameter controls how deep the recursive listing will go. A depth of 0 means only the specified directory, 1 means one level deep, etc. A depth of -1 means unlimited depth.
func WithDestinationAcquireToken ¶
func WithDestinationAcquireToken(enable bool) TransferOption
WithDestinationAcquireToken controls automatic token acquisition for the destination side of a transfer. For put operations this is equivalent to WithAcquireToken; for get operations it is a no-op.
func WithDestinationToken ¶
func WithDestinationToken(token string) TransferOption
WithDestinationToken provides a token for the destination server in a third-party-copy transfer. For get operations, this is a no-op; for put operations it behaves identically to WithToken.
func WithDestinationTokenLocation ¶
func WithDestinationTokenLocation(location string) TransferOption
WithDestinationTokenLocation provides a token file for the destination server in a third-party-copy transfer. For get operations, this is a no-op; for put operations it behaves identically to WithTokenLocation.
func WithDryRun ¶
func WithDryRun(enable bool) TransferOption
Create an option to enable dry-run mode
When enabled, the transfer will display what would be copied without actually modifying the destination. Useful for verifying paths and sources before performing actual transfers.
func WithFedToken ¶
func WithFedToken(provider TokenProvider) TransferOption
WithFedToken provides a federation token that the client sends as an access_token query parameter on the URL to the origin. Unlike the user token (which goes in the Authorization header and is forwarded by the director via the authz query parameter), the federation token is NOT sent to the director — it is appended to the URL only after the director redirect, so it arrives at the origin as a query param. This is compatible with both Go-based and XRootD-based origins.
The provider is queried for a fresh token on each transfer attempt, so callers can pass a refreshable TokenProvider (e.g. one backed by PersistentCache.getFedToken) to handle short-lived tokens that may expire during long downloads.
func WithForcePrestageAPI ¶
func WithForcePrestageAPI(force bool) TransferOption
Create an option to force use of the Pelican prestage API
When enabled for prestage transfers, the client will return an error if the cache does not support the Pelican prestage API instead of falling back to the traditional method. This is useful for testing to ensure the API is actually being used.
func WithInPlace ¶
func WithInPlace(inPlace bool) TransferOption
Create an option to specify whether to write files in-place or use temporary files
When inPlace is false (default), files are written to temporary names and atomically renamed on success (similar to rsync's default behavior). When true, files are written directly to their final destination (similar to rsync's --inplace option).
func WithMetadataChannel ¶
func WithMetadataChannel(ch chan<- TransferMetadata) TransferOption
Create an option to receive early transfer metadata before data transfer begins
When provided, the channel will receive a TransferMetadata struct containing information like ETag, Content-Length, and Last-Modified as soon as the server response headers are received, but before any data is transferred. This allows the caller to make decisions (e.g., verify ETag matches expected) before committing to the full transfer.
The channel is optional (non-blocking send). If the channel is full or nil, the transfer will proceed without waiting. The caller should ensure the channel has buffer capacity of at least 1.
func WithReader ¶
func WithReader(reader io.ReadCloser) TransferOption
Create an option to provide an io.ReadCloser for upload source
When provided, upload data will be read from this reader instead of localPath. The reader will be closed on completion or error.
func WithRecursive ¶
func WithRecursive(recursive bool) TransferOption
Create an option to enable recursive listing
When enabled, the list operation will recursively traverse all subdirectories
func WithRequestChecksums ¶
func WithRequestChecksums(types []ChecksumType) TransferOption
Create an option to specify the checksums to request for a given transfer
func WithRequestId ¶
func WithRequestId(id string) TransferOption
WithRequestId sets a caller-supplied request ID that is propagated as the X-Pelican-JobId header on all HTTP requests made by this transfer. When set, it takes precedence over the HTCondor job-ad lookup. This is used by the cache to thread an incoming client's request ID through to the origin.
func WithRequireChecksum ¶
func WithRequireChecksum() TransferOption
Indicate that checksum verification is required
func WithSourceAcquireToken ¶
func WithSourceAcquireToken(enable bool) TransferOption
WithSourceAcquireToken controls automatic token acquisition for the source side of a transfer. For get operations this is equivalent to WithAcquireToken; for put operations it is a no-op.
func WithSourceToken ¶
func WithSourceToken(token string) TransferOption
Create an option to provide a source token for a third-party-copy transfer
func WithSourceTokenLocation ¶
func WithSourceTokenLocation(location string) TransferOption
Create an option to provide a source token location for a third-party-copy transfer
func WithSynchronize ¶
func WithSynchronize(level SyncLevel) TransferOption
Create an option to specify the object synchronization level
The synchronization level specifies what to do if the destination object already exists.
func WithToken ¶
func WithToken(token string) TransferOption
Create an option to provide a specific token to the transfer
The contents of the token will be used as part of the HTTP request
func WithTokenLocation ¶
func WithTokenLocation(location string) TransferOption
Create an option to override the token locating logic
This will force the transfer to use a specific file for the token contents instead of doing any sort of auto-detection
func WithWriter ¶
func WithWriter(writer io.WriteCloser) TransferOption
Create an option to provide an io.WriteCloser for download destination
When provided, downloaded data will be written to this writer instead of localPath. The writer will be closed on completion or error.
type TransferResult ¶
type TransferResult struct {
Number int `json:"attemptNumber"` // indicates which attempt this is
TransferFileBytes int64 `json:"transferFileBytes"` // how much each attempt downloaded
TimeToFirstByte time.Duration `json:"timeToFirstByte"` // how long it took to download the first byte
TransferEndTime time.Time `json:"transferEndTime"` // when the transfer ends
TransferTime time.Duration `json:"transferTime"` // amount of time we were transferring per attempt (in seconds)
CacheAge time.Duration `json:"cacheAge"` // age of the data reported by the cache
Endpoint string `json:"endpoint"` // which origin did it use
ServerVersion string `json:"serverVersion"` // version of the server
Error error `json:"error"` // what error the attempt returned (if any)
}
type TransferResults ¶
type TransferResults struct {
JobId uuid.UUID `json:"jobId"` // The job ID this result corresponds to
Error error `json:"error"`
TransferredBytes int64 `json:"transferredBytes"`
ETag string `json:"etag,omitempty"` // ETag from the server response (GET or PUT)
ServerChecksums []ChecksumInfo `json:"serverChecksums"` // Checksums returned by the server
ClientChecksums []ChecksumInfo `json:"clientChecksums"` // Checksums calculated by the client
TransferStartTime time.Time `json:"transferStartTime"`
Scheme string `json:"scheme"`
Source string `json:"source"`
Attempts []TransferResult `json:"attempts"`
DirectorDecision *server_structs.RedirectInfo `json:"directorDecision,omitempty"`
// contains filtered or unexported fields
}
Represents the results of a single object transfer, potentially across multiple attempts / retries.
func DoCopy ¶
func DoCopy(ctx context.Context, sourceFile string, destination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start the transfer, whether read or write back. Primarily used for backwards compatibility
func DoGet ¶
func DoGet(ctx context.Context, remoteObject string, localDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object get, gets information from the target source before doing our HTTP GET request
remoteObject: the source file/directory you would like to upload localDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not
func DoPrestage ¶
func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error)
Single-shot call to prestage a single prefix
func DoPut ¶
func DoPut(ctx context.Context, localObject string, remoteDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object put, gets information from the target destination before doing our HTTP PUT request
localObject: the source file/directory you would like to upload remoteDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not
func (TransferResults) ID ¶
func (tr TransferResults) ID() string
type UnexpectedEOFError ¶
type UnexpectedEOFError struct {
Err error
}
func (*UnexpectedEOFError) Error ¶
func (e *UnexpectedEOFError) Error() string
func (*UnexpectedEOFError) Is ¶
func (e *UnexpectedEOFError) Is(target error) bool
func (*UnexpectedEOFError) Unwrap ¶
func (e *UnexpectedEOFError) Unwrap() error