client

package
v0.0.0-...-fe2eafc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: Apache-2.0 Imports: 63 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	PelicanError error_codes.PelicanError

	// Indicates the origin responded too slowly after the cache tried to download from it
	CacheTimedOutReadingFromOrigin = errors.New("cache timed out waiting on origin")

	// ErrObjectNotFound is returned when the requested remote object does not exist.
	ErrObjectNotFound = errors.New("remote object not found")
)
View Source
var (
	ErrServerChecksumMissing = errors.New("no checksum information was returned by server but checksums were required by the client")
)
View Source
var ObjectServersToTry int = 3

Number of caches to attempt to use in any invocation

Functions

func AcquireToken

func AcquireToken(destination *url.URL, dirResp server_structs.DirectorResponse, opts config.TokenGenerationOpts) (string, error)

Given a URL and a director Response, attempt to acquire a valid token for that URL.

func ByteCountSI

func ByteCountSI(b int64) string

Convert b bytes to a human-friendly string with SI units

For example, ByteCountSI(2000) returns "2 KB"

func CanDisableProxy

func CanDisableProxy() bool

Determine whether we are allowed to skip the proxy as a fallback

func ContextWithRequestId

func ContextWithRequestId(ctx context.Context, id string) context.Context

ContextWithRequestId returns a child context that carries the given request ID. Downstream code can retrieve it with RequestIdFromContext.

func CreateSharingUrl

func CreateSharingUrl(ctx context.Context, objectUrl *url.URL, isWrite bool) (string, error)

func DoCacheInfo

func DoCacheInfo(ctx context.Context, destination string, options ...TransferOption) (age int, size int64, err error)

Check the cache information of a remote cache

func DoDelete

func DoDelete(ctx context.Context, remoteDestination string, recursive bool, options ...TransferOption) (err error)

DoDelete queries the director using the DELETE HTTP method, retrieves the token, and initializes the delete operation.

func DoEvict

func DoEvict(ctx context.Context, remoteObject string, immediate bool, options ...TransferOption) (message string, err error)

DoEvict evicts cached objects matching the given path (or prefix) from the local cache. Token bootstrapping follows the same logic as DoGet: tokens are discovered from the environment, credential files, or negotiated via OAuth when needed.

The remoteObject argument is a pelican:// or osdf:// URL (or a schemeless namespace path when federation discovery is configured).

When immediate is true the objects are deleted right away; otherwise they are marked for priority eviction (purge-first).

func DoShadowIngest

func DoShadowIngest(ctx context.Context, sourceFile string, originPrefix string, shadowOriginPrefix string, options ...TransferOption) (int64, string, error)

func GetBehavior

func GetBehavior(behaviorName string) (packerBehavior, error)

func GetDirectorInfoForPath

func GetDirectorInfoForPath(ctx context.Context, pUrl *pelican_url.PelicanURL, httpMethod string, token string) (parsedResponse server_structs.DirectorResponse, err error)

Retrieve federation namespace information for a given URL.

This is the public API; it always queries the director's default endpoint. Internal callers that need embedded cache-mode behaviour should use getDirectorInfoForPath instead.

func GetObjectServerHostnames

func GetObjectServerHostnames(ctx context.Context, testFile string) (urls []string, err error)

func HttpDigestFromChecksum

func HttpDigestFromChecksum(checksumType ChecksumType) string

func IsRetryable

func IsRetryable(err error) bool

IsRetryable will return true if the error is retryable

func KnownChecksumTypesAsHttpDigest

func KnownChecksumTypesAsHttpDigest() (result []string)

List all the checksum types known as HTTP digest strings

func NewTokenGenerator

func NewTokenGenerator(dest *pelican_url.PelicanURL, dirResp *server_structs.DirectorResponse, operation config.TokenOperation, enableAcquire bool) *tokenGenerator

NewTokenGenerator creates a token generator for the given destination and operation. This is the exported entry point used by cmd/token.go; most internal callers should use the unexported newTokenGenerator.

func ParseDirectorInfo

func ParseDirectorInfo(dirResp *http.Response) (server_structs.DirectorResponse, error)

Given the Director response, parse the headers and construct the ordered list of object servers.

func ParseRemoteAsPUrl

func ParseRemoteAsPUrl(ctx context.Context, rp string) (*pelican_url.PelicanURL, error)

Given a remote path, use the client's wisdom to parse it as a Pelican URL, including metadata discovery.

This will handle setting up the URL cache, passing along contexts to discovery, and passing the client context/user agent. Calling this should return a fully populated PelicanURL object, including any metadata that was discovered.

func RequestIdFromContext

func RequestIdFromContext(ctx context.Context) (string, bool)

RequestIdFromContext extracts a request ID previously stored with ContextWithRequestId. Returns ("", false) if none is set.

func ResetJobAd

func ResetJobAd()

Reset the memory-cached copy of the HTCondor job ad

The client will search through the process's environment to find a HTCondor "job ad" and cache its contents in memory; the job ad is used to determine the project name and job ID for the transfer headers.

This function is used to reset the job ad and is intended for use in unit tests that need to reset things from outside the cache package.

func ShouldRetry

func ShouldRetry(err error) bool

func WithDirectorDebug

func WithDirectorDebug(ctx context.Context) context.Context

WithDirectorDebug returns a derived context that causes director queries to include the X-Pelican-Debug header, requesting decision information.

Types

type ByteRange

type ByteRange struct {
	Start int64
	End   int64 // -1 means "to end of file"
}

ByteRange specifies a byte range for partial object transfers Start and End are inclusive byte offsets (0-indexed) End of -1 means "to end of file"

type ChecksumInfo

type ChecksumInfo struct {
	Algorithm ChecksumType
	Value     []byte
}

Value of one checksum calculation

type ChecksumMismatchError

type ChecksumMismatchError struct {
	Info        ChecksumInfo // The checksum that was calculated by the client
	ServerValue []byte       // The checksum value that was calculated by the server
}

Represents a mismatched checksum

func (*ChecksumMismatchError) Error

func (e *ChecksumMismatchError) Error() string

type ChecksumType

type ChecksumType int
const (

	// The checksum algorithms supported by the client
	//
	// Note we have a helper function, KnownChecksumTypes, that returns a list
	// of all the elements enumerated below; do not skip integers in this list
	// or that functionality will break.
	//
	AlgMD5     ChecksumType = iota // Checksum is using the MD5 algorithm
	AlgCRC32C                      // Checksum is using the CRC32C algorithm
	AlgCRC32                       // Checksum is using the CRC32 algorithm
	AlgSHA1                        // Checksum is using the SHA-1 algorithm
	AlgUnknown                     // Unknown checksum algorithm.  Always a "trailer" indicating the last known algorithm.

	AlgDefault = AlgCRC32C // Default checksum algorithm is CRC32C if the client doesn't specify one.

)

func ChecksumFromHttpDigest

func ChecksumFromHttpDigest(httpDigest string) ChecksumType

func KnownChecksumTypes

func KnownChecksumTypes() (result []ChecksumType)

List all the checksum types known to the client

type ConnectionSetupError

type ConnectionSetupError struct {
	URL string
	Err error
}

ConnectionSetupError is an error that is returned when a connection to the remote server fails

func (*ConnectionSetupError) Error

func (e *ConnectionSetupError) Error() string

func (*ConnectionSetupError) Is

func (e *ConnectionSetupError) Is(target error) bool

func (*ConnectionSetupError) Unwrap

func (e *ConnectionSetupError) Unwrap() error

type ConstantSizer

type ConstantSizer struct {
	// contains filtered or unexported fields
}

func (*ConstantSizer) BytesComplete

func (cs *ConstantSizer) BytesComplete() int64

func (*ConstantSizer) Size

func (cs *ConstantSizer) Size() int64

type DirRespCache

type DirRespCache struct {
	// contains filtered or unexported fields
}

DirRespCache caches DirectorResponse values keyed by namespace prefix.

It supports longest-prefix matching: given a path like "/federation/data/subdir/file.txt", it will match an entry stored under the prefix "/federation/data" (but not "/federation/other").

Concurrent cache misses for paths that would map to the same singleflight key are coalesced: only one director query is issued and all waiters receive the same result.

Entries expire after a configurable TTL. The cache is safe for concurrent use.

func NewDirRespCache

func NewDirRespCache(ttl time.Duration) *DirRespCache

NewDirRespCache creates a new prefix-matching cache for director responses. Entries are considered valid for `ttl` after they are stored.

func (*DirRespCache) Invalidate

func (c *DirRespCache) Invalidate(prefix string)

Invalidate removes the entry for the given prefix.

func (*DirRespCache) InvalidateAll

func (c *DirRespCache) InvalidateAll()

InvalidateAll removes all cached entries.

func (*DirRespCache) Len

func (c *DirRespCache) Len() int

Len returns the number of entries in the cache (including expired ones that haven't been cleaned up yet).

func (*DirRespCache) Lookup

func (c *DirRespCache) Lookup(objectPath string) (server_structs.DirectorResponse, bool)

Lookup finds the longest cached prefix that matches `objectPath`.

For example, if the cache contains entries for "/a/b" and "/a", a lookup for "/a/b/c/d.txt" will return the entry for "/a/b".

Returns the cached DirectorResponse and true if a valid (non-expired) entry was found, or the zero value and false otherwise.

func (*DirRespCache) LookupOrLoad

func (c *DirRespCache) LookupOrLoad(ctx context.Context, objectPath string, loader DirRespLoader) (server_structs.DirectorResponse, error)

LookupOrLoad checks the cache first; on a miss it calls `loader` exactly once per unique objectPath, coalescing concurrent callers via singleflight.

If the context is cancelled while waiting for an in-flight query, the waiter returns ctx.Err() immediately. The underlying query keeps running so that other waiters (with live contexts) still receive the result.

On success the response is automatically stored in the cache under the prefix returned by the loader.

func (*DirRespCache) Store

func (c *DirRespCache) Store(prefix string, objectPath string, resp server_structs.DirectorResponse)

Store saves a DirectorResponse under the given prefix. Any previous entry for the same prefix is replaced.

objectPath is the federation object path (e.g. "/test/file.txt") that was used to obtain this response from the director. It is stripped from each ObjectServer URL so the cached entry contains only the server-side base path. Pass "" if no stripping is needed.

type DirRespLoader

type DirRespLoader func(ctx context.Context) (resp server_structs.DirectorResponse, prefix string, err error)

DirRespLoader is a function that queries the director for a given object path. It returns the DirectorResponse and the namespace prefix that should be used as the cache key.

type FileInfo

type FileInfo struct {
	Name         string
	Size         int64
	ModTime      time.Time
	IsCollection bool
	ETag         string            `json:"etag,omitempty"`      // HTTP ETag header value
	Checksums    map[string]string `json:"checksums,omitempty"` // Checksum type (HTTP digest name) to hex-encoded value
}

Our own FileInfo structure to hold information about a file NOTE: this was created to provide more flexibility to information on a file. The fs.FileInfo interface was causing some issues like not always returning a Name attribute ALSO NOTE: the fields are exported so they can be marshalled into JSON, it does not work otherwise

func DoList

func DoList(ctx context.Context, remoteObject string, options ...TransferOption) (fileInfos []FileInfo, err error)

Function for the object ls command, we get target information for our remote object and eventually print out the contents of the specified object

func DoStat

func DoStat(ctx context.Context, destination string, options ...TransferOption) (fileInfo *FileInfo, err error)

Check the size of a remote file in an origin

type HeaderTimeoutError

type HeaderTimeoutError struct{}

func (*HeaderTimeoutError) Error

func (e *HeaderTimeoutError) Error() string

func (*HeaderTimeoutError) Is

func (e *HeaderTimeoutError) Is(target error) bool

type HttpErrResp

type HttpErrResp struct {
	Code int
	Str  string
	Err  error
}

func (*HttpErrResp) Error

func (e *HttpErrResp) Error() string

func (*HttpErrResp) Unwrap

func (e *HttpErrResp) Unwrap() error

type InvalidByteInChunkLengthError

type InvalidByteInChunkLengthError struct {
	Err error
}

func (*InvalidByteInChunkLengthError) Error

func (*InvalidByteInChunkLengthError) Is

func (*InvalidByteInChunkLengthError) Unwrap

type NetworkResetError

type NetworkResetError struct{}

func (*NetworkResetError) Error

func (e *NetworkResetError) Error() string

type PelicanFS

type PelicanFS struct {
	// contains filtered or unexported fields
}

PelicanFS implements io.FS for the Pelican data federation. It provides a filesystem-like interface to objects stored in the federation.

func NewPelicanFS

func NewPelicanFS(ctx context.Context, options ...TransferOption) *PelicanFS

NewPelicanFS creates a new filesystem interface to the Pelican federation. The provided context is used for all operations, and the options are applied to all transfers. If urlPrefix is empty or "/", it defaults to "osdf:///".

func NewPelicanFSWithPrefix

func NewPelicanFSWithPrefix(ctx context.Context, urlPrefix string, options ...TransferOption) *PelicanFS

NewPelicanFSWithPrefix creates a new filesystem interface with a URL prefix. All paths will be relative to this prefix. If prefix is empty or "/", defaults to "osdf:///".

func (*PelicanFS) Open

func (pfs *PelicanFS) Open(name string) (fs.File, error)

Open opens the named file for reading and returns a fs.File that also implements io.ReaderAt, io.Seeker, io.Writer (for write mode), and fs.ReadDirFile (for directories).

func (*PelicanFS) OpenFile

func (pfs *PelicanFS) OpenFile(name string, flag int) (fs.File, error)

OpenFile opens the named file with specified flags. Supported flags: os.O_RDONLY, os.O_WRONLY, os.O_RDWR, os.O_CREATE

type PelicanFile

type PelicanFile struct {
	// contains filtered or unexported fields
}

PelicanFile represents an open file in the Pelican federation. It implements fs.File, io.ReaderAt, io.Seeker, io.Writer, and fs.ReadDirFile.

Thread-safety: Most fields that can change during the file's lifetime are protected by mu. The currentEndpoint field uses atomic operations for lock-free access during range reads. Functions with "Locked" suffix must be called with mu held.

func (*PelicanFile) Close

func (pf *PelicanFile) Close() error

Close closes the file, rendering it unusable for I/O. It implements fs.File.

func (*PelicanFile) Read

func (pf *PelicanFile) Read(p []byte) (n int, err error)

Read reads up to len(p) bytes into p.

func (*PelicanFile) ReadAt

func (pf *PelicanFile) ReadAt(p []byte, off int64) (n int, err error)

ReadAt reads len(p) bytes into p starting at offset off in the file. It implements io.ReaderAt. Note: ReadAt does not affect the file position.

func (*PelicanFile) ReadDir

func (pf *PelicanFile) ReadDir(n int) ([]fs.DirEntry, error)

ReadDir reads the contents of the directory and returns a slice of DirEntry values. It implements fs.ReadDirFile. Can be called multiple times to paginate through entries.

func (*PelicanFile) Seek

func (pf *PelicanFile) Seek(offset int64, whence int) (int64, error)

Seek sets the offset for the next Read operation and returns the new offset. It implements io.Seeker.

func (*PelicanFile) Stat

func (pf *PelicanFile) Stat() (fs.FileInfo, error)

Stat returns the FileInfo structure describing the file. It implements fs.File.

func (*PelicanFile) Write

func (pf *PelicanFile) Write(p []byte) (n int, err error)

Write writes len(p) bytes from p to the file. It implements io.Writer.

type PermissionDeniedError

type PermissionDeniedError struct {
	// contains filtered or unexported fields
}

PermissionDeniedError is returned when a 403 status code is received. The message is generated based on the token's validity.

func (*PermissionDeniedError) Error

func (e *PermissionDeniedError) Error() string

type ServerPriority

type ServerPriority struct {
	URL      *url.URL
	Priority int
}

type Sizer

type Sizer interface {
	Size() int64
	BytesComplete() int64
}

type SlowTransferError

type SlowTransferError struct {
	BytesTransferred int64
	BytesPerSecond   int64
	BytesTotal       int64
	Duration         time.Duration
	CacheAge         time.Duration
}

SlowTransferError is an error that is returned when a transfer takes longer than the configured timeout

func (*SlowTransferError) Error

func (e *SlowTransferError) Error() (errMsg string)

func (*SlowTransferError) Is

func (e *SlowTransferError) Is(target error) bool

type StatusCodeError

type StatusCodeError int

StatusCodeError indicates the server returned a non-200 code.

The wrapper is done to provide a Pelican-based error hierarchy in case we ever decide to have a different underlying download package.

func (*StatusCodeError) Error

func (e *StatusCodeError) Error() string

func (*StatusCodeError) Is

func (e *StatusCodeError) Is(target error) bool

type StoppedTransferError

type StoppedTransferError struct {
	BytesTransferred int64
	StoppedTime      time.Duration
	CacheHit         bool
	Upload           bool
}

Error type for when the transfer started to return data then completely stopped

func (*StoppedTransferError) Error

func (e *StoppedTransferError) Error() (errMsg string)

func (*StoppedTransferError) Is

func (e *StoppedTransferError) Is(target error) bool

type SyncLevel

type SyncLevel int

Different types of synchronization for recursize transfers

const (
	SyncNone  SyncLevel = iota // When synchronizing, always re-transfer, regardless of existence at destination.
	SyncExist                  // Skip synchronization transfer if the destination exists
	SyncSize                   // Skip synchronization transfer if the destination exists and matches the current source size
)

type TimestampedError

type TimestampedError struct {
	// contains filtered or unexported fields
}

func (*TimestampedError) Error

func (te *TimestampedError) Error() string

func (*TimestampedError) Unwrap

func (te *TimestampedError) Unwrap() error

type TokenProvider

type TokenProvider interface {
	Get() (string, error)
}

TokenProvider returns a token value, refreshing as needed. Implementations must be safe for concurrent use. See tokenGenerator for the standard implementation.

func StaticTokenProvider

func StaticTokenProvider(token string) TokenProvider

StaticTokenProvider returns a TokenProvider that always yields the given token string. Useful when the caller has a fixed token and does not need refresh logic.

type TransferAttemptError

type TransferAttemptError struct {
	// contains filtered or unexported fields
}

Transfer attempt error wraps an error with information about the service/proxy used

func (*TransferAttemptError) Error

func (tae *TransferAttemptError) Error() (errMsg string)

func (*TransferAttemptError) Is

func (tae *TransferAttemptError) Is(target error) bool

func (*TransferAttemptError) Unwrap

func (tae *TransferAttemptError) Unwrap() error

type TransferCallbackFunc

type TransferCallbackFunc = func(path string, downloaded int64, totalSize int64, completed bool)

type TransferClient

type TransferClient struct {
	// contains filtered or unexported fields
}

A client to the transfer engine.

func (*TransferClient) CacheInfo

func (tc *TransferClient) CacheInfo(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (age int, size int64, err error)

cacheInfo retrieves and returns the age and size of the specified object.

func (*TransferClient) Cancel

func (tc *TransferClient) Cancel()

Cancel a client

When cancelled, all channels and goroutines associated with the client will close/exit immediately.

func (*TransferClient) Close

func (tc *TransferClient) Close()

Close the transfer client object

Any subsequent job submissions will cause a panic

func (*TransferClient) NewCopyJob

func (tc *TransferClient) NewCopyJob(ctx context.Context, src *url.URL, dest *url.URL, recursive bool, options ...TransferOption) (tj *TransferJob, err error)

Create a new third-party copy job for the client.

This creates a transfer that uses the HTTP COPY verb to instruct the destination server to pull data directly from the source, without the client acting as an intermediary.

The returned object can be further customized as desired. This function does not "submit" the job for execution.

func (*TransferClient) NewPrestageJob

func (tc *TransferClient) NewPrestageJob(ctx context.Context, remoteUrl *url.URL, options ...TransferOption) (tj *TransferJob, err error)

Create a new prestage job for the client

The returned object can be further customized as desired. This function does not "submit" the job for execution.

func (*TransferClient) NewTransferJob

func (tc *TransferClient) NewTransferJob(ctx context.Context, remoteUrl *url.URL, localPath string, upload bool, recursive bool, options ...TransferOption) (tj *TransferJob, err error)

Create a new transfer job for the client

The returned object can be further customized as desired. This function does not "submit" the job for execution.

func (*TransferClient) Results

func (tc *TransferClient) Results() chan TransferResults

Return a channel containing the results from the client

func (*TransferClient) Shutdown

func (tc *TransferClient) Shutdown() (results []TransferResults, err error)

Shutdown the transfer client

Closes the client and waits for all jobs to exit cleanly. Returns any results that were pending when Shutdown was called

func (*TransferClient) Submit

func (tc *TransferClient) Submit(tj *TransferJob) error

Submit the transfer job to the client for processing

type TransferEngine

type TransferEngine struct {
	// contains filtered or unexported fields
}

An object able to process transfer jobs.

func NewTransferEngine

func NewTransferEngine(ctx context.Context) (te *TransferEngine, err error)

Returns a new transfer engine object whose lifetime is tied to the provided context. Will launcher worker goroutines to handle the underlying transfers

func (*TransferEngine) Close

func (te *TransferEngine) Close()

Closes the TransferEngine. No new work may be submitted. Any ongoing work will continue

func (*TransferEngine) NewClient

func (te *TransferEngine) NewClient(options ...TransferOption) (client *TransferClient, err error)

Create a new client to work with an engine

func (*TransferEngine) Shutdown

func (te *TransferEngine) Shutdown() error

Initiates a shutdown of the transfer engine. Waits until all workers have finished

type TransferErrors

type TransferErrors struct {
	// contains filtered or unexported fields
}

A container object for multiple sub-errors representing transfer failures.

func NewTransferErrors

func NewTransferErrors() *TransferErrors

Create a new transfer error object

func (*TransferErrors) AddError

func (te *TransferErrors) AddError(err error)

func (*TransferErrors) AddPastError

func (te *TransferErrors) AddPastError(err error, timestamp time.Time)

func (*TransferErrors) AllErrorsRetryable

func (te *TransferErrors) AllErrorsRetryable() bool

Returns true if all errors are retryable. If no errors are present, then returns true

func (*TransferErrors) Error

func (te *TransferErrors) Error() string

func (*TransferErrors) Unwrap

func (te *TransferErrors) Unwrap() []error

func (*TransferErrors) UserError

func (te *TransferErrors) UserError() string

Return a more refined, user-friendly error string

type TransferJob

type TransferJob struct {
	// contains filtered or unexported fields
}

A representation of a "transfer job". The job can be submitted to the client library, resulting in one or more transfers (if recursive is true). We assume the transfer job is potentially queued for a long time and all the transfers generated by this job will use the same namespace and token.

func (*TransferJob) Cancel

func (tj *TransferJob) Cancel()

Cancel the transfer job

func (*TransferJob) GetLookupStatus

func (tj *TransferJob) GetLookupStatus() (ok bool, err error)

Returns the status of the transfer job-to-file(s) lookup

ok is true if the lookup has completed.

func (*TransferJob) ID

func (tj *TransferJob) ID() string

Get the transfer's ID

type TransferMetadata

type TransferMetadata struct {
	ContentLength int64     // Size of the HTTP response body (range length for 206 Partial Content; full object for 200 OK)
	ObjectSize    int64     // Full object size (-1 if unknown; parsed from Content-Range for 206, same as ContentLength for 200)
	ETag          string    // ETag header from response
	LastModified  time.Time // Last-Modified header from response
	ContentType   string    // Content-Type header from response
	CacheControl  string    // Cache-Control header from response
}

TransferMetadata contains early metadata about a transfer received from the server before the data transfer begins. This allows making decisions (e.g., ETag verification) before committing to the full transfer.

type TransferOption

type TransferOption = option.Interface

func WithAcquireToken

func WithAcquireToken(enable bool) TransferOption

Create an option to specify the token acquisition logic

Token acquisition (e.g., using OAuth2 to get a token when one isn't found in the environment) defaults to `true` but can be disabled with this options

func WithByteRange

func WithByteRange(start, end int64) TransferOption

Create an option to specify a byte range for partial object downloads

The start and end parameters are inclusive byte offsets (0-indexed). Use end=-1 to download from start to the end of the file. Example: WithByteRange(0, 1023) downloads the first 1024 bytes. Example: WithByteRange(1024, -1) downloads from byte 1024 to the end.

func WithCacheEmbeddedClientMode

func WithCacheEmbeddedClientMode() TransferOption

WithCacheEmbeddedClientMode sets the client into "cache-embedded" mode. In this mode, the client queries the director's origin endpoint (/api/v1.0/director/origin/…) instead of the default shortcut endpoint. This causes the director to redirect to origins rather than to caches, which is the correct behaviour when the transfer client is itself embedded inside a cache process.

Without this option, a GET for /test/file.txt is routed through the director's shortcut middleware, which redirects to a cache. With this option, the same GET is explicitly routed to the origin endpoint so the cache can fetch from the origin.

func WithCaches

func WithCaches(caches ...*url.URL) TransferOption

Create an option to override the cache list

func WithCallback

func WithCallback(callback TransferCallbackFunc) TransferOption

Create an option that provides a callback for a TransferClient

The callback is invoked periodically by one of the transfer workers, with inputs of the local path (e.g., source on upload), the current bytes transferred, and the total object size

func WithCollectionsUrl

func WithCollectionsUrl(url string) TransferOption

Override collections URL to be used by the TransferClient

func WithDepth

func WithDepth(depth int) TransferOption

Create an option to specify the maximum depth for recursive listing

The depth parameter controls how deep the recursive listing will go. A depth of 0 means only the specified directory, 1 means one level deep, etc. A depth of -1 means unlimited depth.

func WithDestinationAcquireToken

func WithDestinationAcquireToken(enable bool) TransferOption

WithDestinationAcquireToken controls automatic token acquisition for the destination side of a transfer. For put operations this is equivalent to WithAcquireToken; for get operations it is a no-op.

func WithDestinationToken

func WithDestinationToken(token string) TransferOption

WithDestinationToken provides a token for the destination server in a third-party-copy transfer. For get operations, this is a no-op; for put operations it behaves identically to WithToken.

func WithDestinationTokenLocation

func WithDestinationTokenLocation(location string) TransferOption

WithDestinationTokenLocation provides a token file for the destination server in a third-party-copy transfer. For get operations, this is a no-op; for put operations it behaves identically to WithTokenLocation.

func WithDryRun

func WithDryRun(enable bool) TransferOption

Create an option to enable dry-run mode

When enabled, the transfer will display what would be copied without actually modifying the destination. Useful for verifying paths and sources before performing actual transfers.

func WithFedToken

func WithFedToken(provider TokenProvider) TransferOption

WithFedToken provides a federation token that the client sends as an access_token query parameter on the URL to the origin. Unlike the user token (which goes in the Authorization header and is forwarded by the director via the authz query parameter), the federation token is NOT sent to the director — it is appended to the URL only after the director redirect, so it arrives at the origin as a query param. This is compatible with both Go-based and XRootD-based origins.

The provider is queried for a fresh token on each transfer attempt, so callers can pass a refreshable TokenProvider (e.g. one backed by PersistentCache.getFedToken) to handle short-lived tokens that may expire during long downloads.

func WithForcePrestageAPI

func WithForcePrestageAPI(force bool) TransferOption

Create an option to force use of the Pelican prestage API

When enabled for prestage transfers, the client will return an error if the cache does not support the Pelican prestage API instead of falling back to the traditional method. This is useful for testing to ensure the API is actually being used.

func WithInPlace

func WithInPlace(inPlace bool) TransferOption

Create an option to specify whether to write files in-place or use temporary files

When inPlace is false (default), files are written to temporary names and atomically renamed on success (similar to rsync's default behavior). When true, files are written directly to their final destination (similar to rsync's --inplace option).

func WithMetadataChannel

func WithMetadataChannel(ch chan<- TransferMetadata) TransferOption

Create an option to receive early transfer metadata before data transfer begins

When provided, the channel will receive a TransferMetadata struct containing information like ETag, Content-Length, and Last-Modified as soon as the server response headers are received, but before any data is transferred. This allows the caller to make decisions (e.g., verify ETag matches expected) before committing to the full transfer.

The channel is optional (non-blocking send). If the channel is full or nil, the transfer will proceed without waiting. The caller should ensure the channel has buffer capacity of at least 1.

func WithReader

func WithReader(reader io.ReadCloser) TransferOption

Create an option to provide an io.ReadCloser for upload source

When provided, upload data will be read from this reader instead of localPath. The reader will be closed on completion or error.

func WithRecursive

func WithRecursive(recursive bool) TransferOption

Create an option to enable recursive listing

When enabled, the list operation will recursively traverse all subdirectories

func WithRequestChecksums

func WithRequestChecksums(types []ChecksumType) TransferOption

Create an option to specify the checksums to request for a given transfer

func WithRequestId

func WithRequestId(id string) TransferOption

WithRequestId sets a caller-supplied request ID that is propagated as the X-Pelican-JobId header on all HTTP requests made by this transfer. When set, it takes precedence over the HTCondor job-ad lookup. This is used by the cache to thread an incoming client's request ID through to the origin.

func WithRequireChecksum

func WithRequireChecksum() TransferOption

Indicate that checksum verification is required

func WithSourceAcquireToken

func WithSourceAcquireToken(enable bool) TransferOption

WithSourceAcquireToken controls automatic token acquisition for the source side of a transfer. For get operations this is equivalent to WithAcquireToken; for put operations it is a no-op.

func WithSourceToken

func WithSourceToken(token string) TransferOption

Create an option to provide a source token for a third-party-copy transfer

func WithSourceTokenLocation

func WithSourceTokenLocation(location string) TransferOption

Create an option to provide a source token location for a third-party-copy transfer

func WithSynchronize

func WithSynchronize(level SyncLevel) TransferOption

Create an option to specify the object synchronization level

The synchronization level specifies what to do if the destination object already exists.

func WithToken

func WithToken(token string) TransferOption

Create an option to provide a specific token to the transfer

The contents of the token will be used as part of the HTTP request

func WithTokenLocation

func WithTokenLocation(location string) TransferOption

Create an option to override the token locating logic

This will force the transfer to use a specific file for the token contents instead of doing any sort of auto-detection

func WithWriter

func WithWriter(writer io.WriteCloser) TransferOption

Create an option to provide an io.WriteCloser for download destination

When provided, downloaded data will be written to this writer instead of localPath. The writer will be closed on completion or error.

type TransferResult

type TransferResult struct {
	Number            int           `json:"attemptNumber"`     // indicates which attempt this is
	TransferFileBytes int64         `json:"transferFileBytes"` // how much each attempt downloaded
	TimeToFirstByte   time.Duration `json:"timeToFirstByte"`   // how long it took to download the first byte
	TransferEndTime   time.Time     `json:"transferEndTime"`   // when the transfer ends
	TransferTime      time.Duration `json:"transferTime"`      // amount of time we were transferring per attempt (in seconds)
	CacheAge          time.Duration `json:"cacheAge"`          // age of the data reported by the cache
	Endpoint          string        `json:"endpoint"`          // which origin did it use
	ServerVersion     string        `json:"serverVersion"`     // version of the server
	Error             error         `json:"error"`             // what error the attempt returned (if any)
}

type TransferResults

type TransferResults struct {
	JobId uuid.UUID `json:"jobId"` // The job ID this result corresponds to

	Error             error                        `json:"error"`
	TransferredBytes  int64                        `json:"transferredBytes"`
	ETag              string                       `json:"etag,omitempty"`  // ETag from the server response (GET or PUT)
	ServerChecksums   []ChecksumInfo               `json:"serverChecksums"` // Checksums returned by the server
	ClientChecksums   []ChecksumInfo               `json:"clientChecksums"` // Checksums calculated by the client
	TransferStartTime time.Time                    `json:"transferStartTime"`
	Scheme            string                       `json:"scheme"`
	Source            string                       `json:"source"`
	Attempts          []TransferResult             `json:"attempts"`
	DirectorDecision  *server_structs.RedirectInfo `json:"directorDecision,omitempty"`
	// contains filtered or unexported fields
}

Represents the results of a single object transfer, potentially across multiple attempts / retries.

func DoCopy

func DoCopy(ctx context.Context, sourceFile string, destination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)

Start the transfer, whether read or write back. Primarily used for backwards compatibility

func DoGet

func DoGet(ctx context.Context, remoteObject string, localDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object get, gets information from the target source before doing our HTTP GET request

remoteObject: the source file/directory you would like to upload localDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not

func DoPrestage

func DoPrestage(ctx context.Context, prefixUrl string, options ...TransferOption) (transferResults []TransferResults, err error)

Single-shot call to prestage a single prefix

func DoPut

func DoPut(ctx context.Context, localObject string, remoteDestination string, recursive bool, options ...TransferOption) (transferResults []TransferResults, err error)
Start of transfer for pelican object put, gets information from the target destination before doing our HTTP PUT request

localObject: the source file/directory you would like to upload remoteDestination: the end location of the upload recursive: a boolean indicating if the source is a directory or not

func (TransferResults) ID

func (tr TransferResults) ID() string

type UnexpectedEOFError

type UnexpectedEOFError struct {
	Err error
}

func (*UnexpectedEOFError) Error

func (e *UnexpectedEOFError) Error() string

func (*UnexpectedEOFError) Is

func (e *UnexpectedEOFError) Is(target error) bool

func (*UnexpectedEOFError) Unwrap

func (e *UnexpectedEOFError) Unwrap() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL