tq

package
v2.13.1-0...-763252b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2021 License: BSD-3-Clause, MIT Imports: 30 Imported by: 0

Documentation

Overview

Package transfer collects together adapters for uploading and downloading LFS content NOTE: Subject to change, do not rely on this package from outside git-lfs source

Index

Constants

View Source
const (
	Upload   = Direction(iota)
	Download = Direction(iota)
	Checkout = Direction(iota)
)
View Source
const (
	TusAdapterName = "tus"
	TusVersion     = "1.0.0"
)
View Source
const (
	BasicAdapterName = "basic"
)

Variables

This section is empty.

Functions

func IsActionExpiredError

func IsActionExpiredError(err error) bool

func NewCustomAdapterDownloadRequest

func NewCustomAdapterDownloadRequest(oid string, size int64, action *Action) *customAdapterTransferRequest

func NewCustomAdapterInitRequest

func NewCustomAdapterInitRequest(
	op string, remote string, concurrent bool, concurrentTransfers int,
) *customAdapterInitRequest

func NewCustomAdapterTerminateRequest

func NewCustomAdapterTerminateRequest() *customAdapterTerminateRequest

func NewCustomAdapterUploadRequest

func NewCustomAdapterUploadRequest(oid string, size int64, path string, action *Action) *customAdapterTransferRequest

Types

type Action

type Action struct {
	Href      string            `json:"href"`
	Header    map[string]string `json:"header,omitempty"`
	ExpiresAt time.Time         `json:"expires_at,omitempty"`
	ExpiresIn int               `json:"expires_in,omitempty"`
	Id        string            `json:"-"`
	Token     string            `json:"-"`
	// contains filtered or unexported fields
}

func (*Action) IsExpiredWithin

func (a *Action) IsExpiredWithin(d time.Duration) (time.Time, bool)

type ActionExpiredErr

type ActionExpiredErr struct {
	Rel string
	At  time.Time
}

func (ActionExpiredErr) Error

func (e ActionExpiredErr) Error() string

type ActionSet

type ActionSet map[string]*Action

func (ActionSet) Get

func (as ActionSet) Get(rel string) (*Action, error)

type Adapter

type Adapter interface {
	// Name returns the name of this adapter, which is the same for all instances
	// of this type of adapter
	Name() string
	// Direction returns whether this instance is an upload or download instance
	// Adapter instances can only be one or the other, although the same
	// type may be instantiated for each direction
	Direction() Direction
	// Begin a new batch of uploads or downloads. Call this first, followed by one
	// or more Add calls. The passed in callback will receive updates on progress.
	Begin(cfg AdapterConfig, cb ProgressCallback) error
	// Add queues a download/upload, which will complete asynchronously and
	// notify the callbacks given to Begin()
	Add(transfers ...*Transfer) (results <-chan TransferResult)
	// Indicate that all transfers have been scheduled and resources can be released
	// once the queued items have completed.
	// This call blocks until all items have been processed
	End()
}

Adapter is implemented by types which can upload and/or download LFS file content to a remote store. Each Adapter accepts one or more requests which it may schedule and parallelise in whatever way it chooses, clients of this interface will receive notifications of progress and completion asynchronously. TransferAdapters support transfers in one direction; if an implementation provides support for upload and download, it should be instantiated twice, advertising support for each direction separately. Note that Adapter only implements the actual upload/download of content itself; organising the wider process including calling the API to get URLs, handling progress reporting and retries is the job of the core TransferQueue. This is so that the orchestration remains core & standard but Adapter can be changed to physically transfer to different hosts with less code.

type AdapterConfig

type AdapterConfig interface {
	APIClient() *lfsapi.Client
	ConcurrentTransfers() int
	Remote() string
}

type BatchClient

type BatchClient interface {
	Batch(remote string, bReq *batchRequest) (*BatchResponse, error)
	MaxRetries() int
	SetMaxRetries(n int)
}

type BatchResponse

type BatchResponse struct {
	Objects             []*Transfer `json:"objects"`
	TransferAdapterName string      `json:"transfer"`
	// contains filtered or unexported fields
}

func Batch

func Batch(m *Manifest, dir Direction, remote string, remoteRef *git.Ref, objects []*Transfer) (*BatchResponse, error)

type Direction

type Direction int

func (Direction) String

func (d Direction) String() string

func (Direction) Verb

func (d Direction) Verb() string

Verb returns a string containing the verb form of the receiving action.

type Env

type Env interface {
	Get(key string) (val string, ok bool)
	GetAll(key string) []string
	Bool(key string, def bool) (val bool)
	Int(key string, def int) (val int)
	All() map[string][]string
}

Env is any object with a config.Environment interface.

type MalformedObjectError

type MalformedObjectError struct {
	Name string
	Oid  string
	// contains filtered or unexported fields
}

func (MalformedObjectError) Corrupt

func (e MalformedObjectError) Corrupt() bool

func (MalformedObjectError) Error

func (e MalformedObjectError) Error() string

func (MalformedObjectError) Missing

func (e MalformedObjectError) Missing() bool

type Manifest

type Manifest struct {
	// contains filtered or unexported fields
}

func NewManifest

func NewManifest(f *fs.Filesystem, apiClient *lfsapi.Client, operation, remote string) *Manifest

func (*Manifest) APIClient

func (m *Manifest) APIClient() *lfsapi.Client

func (*Manifest) ConcurrentTransfers

func (m *Manifest) ConcurrentTransfers() int

func (*Manifest) GetAdapterNames

func (m *Manifest) GetAdapterNames(dir Direction) []string

GetAdapterNames returns a list of the names of adapters available to be created

func (*Manifest) GetDownloadAdapterNames

func (m *Manifest) GetDownloadAdapterNames() []string

GetDownloadAdapterNames returns a list of the names of download adapters available to be created

func (*Manifest) GetUploadAdapterNames

func (m *Manifest) GetUploadAdapterNames() []string

GetUploadAdapterNames returns a list of the names of upload adapters available to be created

func (*Manifest) IsStandaloneTransfer

func (m *Manifest) IsStandaloneTransfer() bool

func (*Manifest) MaxRetries

func (m *Manifest) MaxRetries() int

func (*Manifest) MaxRetryDelay

func (m *Manifest) MaxRetryDelay() int

func (*Manifest) NewAdapter

func (m *Manifest) NewAdapter(name string, dir Direction) Adapter

Create a new adapter by name and direction, or nil if doesn't exist

func (*Manifest) NewAdapterOrDefault

func (m *Manifest) NewAdapterOrDefault(name string, dir Direction) Adapter

Create a new adapter by name and direction; default to BasicAdapterName if doesn't exist

func (*Manifest) NewDownloadAdapter

func (m *Manifest) NewDownloadAdapter(name string) Adapter

Create a new download adapter by name, or BasicAdapterName if doesn't exist

func (*Manifest) NewUploadAdapter

func (m *Manifest) NewUploadAdapter(name string) Adapter

Create a new upload adapter by name, or BasicAdapterName if doesn't exist

func (*Manifest) RegisterNewAdapterFunc

func (m *Manifest) RegisterNewAdapterFunc(name string, dir Direction, f NewAdapterFunc)

RegisterNewTransferAdapterFunc registers a new function for creating upload or download adapters. If a function with that name & direction is already registered, it is overridden

type Meter

type Meter struct {
	DryRun    bool
	Logger    *tools.SyncWriter
	Direction Direction
	// contains filtered or unexported fields
}

Meter provides a progress bar type output for the TransferQueue. It is given an estimated file count and size up front and tracks the number of files and bytes transferred as well as the number of files and bytes that get skipped because the transfer is unnecessary.

func NewMeter

func NewMeter(cfg *config.Configuration) *Meter

NewMeter creates a new Meter.

func (*Meter) Add

func (m *Meter) Add(size int64)

Add tells the progress meter that a single file of the given size will possibly be transferred. If a file doesn't need to be transferred for some reason, be sure to call Skip(int64) with the same size.

func (*Meter) Finish

func (m *Meter) Finish()

Finish shuts down the Meter.

func (*Meter) FinishTransfer

func (m *Meter) FinishTransfer(name string)

FinishTransfer increments the finished transfer count

func (*Meter) Flush

func (m *Meter) Flush()

Flush sends the latest progress update, while leaving the meter active.

func (*Meter) LoggerFromEnv

func (m *Meter) LoggerFromEnv(os env) *tools.SyncWriter

func (*Meter) LoggerToFile

func (m *Meter) LoggerToFile(name string) *tools.SyncWriter

func (*Meter) Pause

func (m *Meter) Pause()

Pause stops sending status updates temporarily, until Start() is called again.

func (*Meter) Skip

func (m *Meter) Skip(size int64)

Skip tells the progress meter that a file of size `size` is being skipped because the transfer is unnecessary.

func (*Meter) Start

func (m *Meter) Start()

Start begins sending status updates to the optional log file, and stdout.

func (*Meter) StartTransfer

func (m *Meter) StartTransfer(name string)

StartTransfer tells the progress meter that a transferring file is being added to the TransferQueue.

func (*Meter) Throttled

func (m *Meter) Throttled() bool

func (*Meter) TransferBytes

func (m *Meter) TransferBytes(direction, name string, read, total int64, current int)

TransferBytes increments the number of bytes transferred

func (*Meter) Updates

func (m *Meter) Updates() <-chan *tasklog.Update

type NewAdapterFunc

type NewAdapterFunc func(name string, dir Direction) Adapter

NewAdapterFunc creates new instances of Adapter. Code that wishes to provide new Adapter instances should pass an implementation of this function to RegisterNewTransferAdapterFunc() on a *Manifest. name and dir are to provide context if one func implements many instances

type ObjectError

type ObjectError struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
}

func (*ObjectError) Error

func (e *ObjectError) Error() string

type Option

type Option func(*TransferQueue)

func DryRun

func DryRun(dryRun bool) Option

func RemoteRef

func RemoteRef(ref *git.Ref) Option

func WithBatchSize

func WithBatchSize(size int) Option

func WithBufferDepth

func WithBufferDepth(depth int) Option

func WithProgress

func WithProgress(m *Meter) Option

func WithProgressCallback

func WithProgressCallback(cb tools.CopyCallback) Option

type ProgressCallback

type ProgressCallback func(name string, totalSize, readSoFar int64, readSinceLast int) error

type SSHAdapter

type SSHAdapter struct {
	// contains filtered or unexported fields
}

func (SSHAdapter) Add

func (a SSHAdapter) Add(transfers ...*Transfer) <-chan TransferResult

func (*SSHAdapter) Begin

func (a *SSHAdapter) Begin(cfg AdapterConfig, cb ProgressCallback) error

Begin a new batch of uploads or downloads. Call this first, followed by one or more Add calls. The passed in callback will receive updates on progress.

func (SSHAdapter) Direction

func (a SSHAdapter) Direction() Direction

func (*SSHAdapter) DoTransfer

func (a *SSHAdapter) DoTransfer(ctx interface{}, t *Transfer, cb ProgressCallback, authOkFunc func()) error

DoTransfer performs a single transfer within a worker. ctx is any context returned from WorkerStarting

func (SSHAdapter) End

func (a SSHAdapter) End()

func (SSHAdapter) Name

func (a SSHAdapter) Name() string

func (*SSHAdapter) Trace

func (a *SSHAdapter) Trace(format string, args ...interface{})

func (*SSHAdapter) WorkerEnding

func (a *SSHAdapter) WorkerEnding(workerNum int, ctx interface{})

WorkerEnding is called when a worker goroutine is shutting down Implementations can clean up per-worker resources here, context is as returned from WorkerStarting

func (*SSHAdapter) WorkerStarting

func (a *SSHAdapter) WorkerStarting(workerNum int) (interface{}, error)

WorkerStarting is called when a worker goroutine starts to process jobs Implementations can run some startup logic here & return some context if needed

type SSHBatchClient

type SSHBatchClient struct {
	// contains filtered or unexported fields
}

func (*SSHBatchClient) Batch

func (a *SSHBatchClient) Batch(remote string, bReq *batchRequest) (*BatchResponse, error)

func (*SSHBatchClient) MaxRetries

func (a *SSHBatchClient) MaxRetries() int

func (*SSHBatchClient) SetMaxRetries

func (a *SSHBatchClient) SetMaxRetries(n int)

type Transfer

type Transfer struct {
	Name          string       `json:"name,omitempty"`
	Oid           string       `json:"oid,omitempty"`
	Size          int64        `json:"size"`
	Authenticated bool         `json:"authenticated,omitempty"`
	Actions       ActionSet    `json:"actions,omitempty"`
	Links         ActionSet    `json:"_links,omitempty"`
	Error         *ObjectError `json:"error,omitempty"`
	Path          string       `json:"path,omitempty"`
	Missing       bool         `json:"-"`
}

func (*Transfer) Rel

func (t *Transfer) Rel(name string) (*Action, error)

type TransferQueue

type TransferQueue struct {
	// contains filtered or unexported fields
}

TransferQueue organises the wider process of uploading and downloading, including calling the API, passing the actual transfer request to transfer adapters, and dealing with progress, errors and retries.

func NewTransferQueue

func NewTransferQueue(dir Direction, manifest *Manifest, remote string, options ...Option) *TransferQueue

NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter

func (*TransferQueue) Add

func (q *TransferQueue) Add(name, path, oid string, size int64, missing bool, err error)

Add adds a *Transfer to the transfer queue. It only increments the amount of waiting the TransferQueue has to do if the *Transfer "t" is new.

If another transfer(s) with the same OID has been added to the *TransferQueue already, the given transfer will not be enqueued, but will be sent to any channel created by Watch() once the oldest transfer has completed.

Only one file will be transferred to/from the Path element of the first transfer.

func (*TransferQueue) BatchSize

func (q *TransferQueue) BatchSize() int

BatchSize returns the batch size of the receiving *TransferQueue, or, the number of transfers to accept before beginning work on them.

func (*TransferQueue) Errors

func (q *TransferQueue) Errors() []error

Errors returns any errors encountered during transfer.

func (*TransferQueue) Skip

func (q *TransferQueue) Skip(size int64)

func (*TransferQueue) Wait

func (q *TransferQueue) Wait()

Wait waits for the queue to finish processing all transfers. Once Wait is called, Add will no longer add transfers to the queue. Any failed transfers will be automatically retried once.

func (*TransferQueue) Watch

func (q *TransferQueue) Watch() chan *Transfer

Watch returns a channel where the queue will write the value of each transfer as it completes. If multiple transfers exist with the same OID, they will all be recorded here, even though only one actual transfer took place. The channel will be closed when the queue finishes processing.

type TransferResult

type TransferResult struct {
	Transfer *Transfer
	// This will be non-nil if there was an error transferring this item
	Error error
}

Result of a transfer returned through CompletionChannel()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL