pipeline

package
v0.0.0-...-51f9457 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package pipeline implements the pipeline to efficiently archive file sets to an isolated server as fast as possible.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func PushDirectory

func PushDirectory(a *Archiver, root, relDir string) (map[*PendingItem]PushedDirectoryItem, map[string]string, error)

PushDirectory walks a directory at root and pushes the files in it. relDir is a relative directory to offset relative paths against the root directory. For symlink that points to another file under the same root (called *in-tree*), the symlink is preserved. Otherwise, the symlink is followed (*out-of-tree*), and is pushed as an ordinary file.

It does not return until all the pushed items are hashed, which is done asynchornously.

The returned files are in a map from its relative path to PushedDirectoryItem. The returned in-tree symlinks are in a map from its relative path to the relative path it points to.

Types

type Archiver

type Archiver struct {
	// contains filtered or unexported fields
}

Archiver is an high level interface to an isolatedclient.Client.

Uses a 4 stages pipeline, each doing work concurrently:

  • Deduplicating similar requests or known server hot cache hits.
  • Hashing files.
  • Batched cache hit lookups on the server.
  • Uploading cache misses.

func NewArchiver

func NewArchiver(ctx context.Context, c *isolatedclient.Client, out io.Writer) *Archiver

NewArchiver returns a thread-safe Archiver instance.

If not nil, out will contain tty-oriented progress information.

ctx will be used for logging.

func (*Archiver) Close

func (a *Archiver) Close() error

Close waits for all pending files to be done. If an error occurred during processing, it is returned.

func (*Archiver) Hash

func (a *Archiver) Hash() crypto.Hash

Hash returns the hashing algorithm used by this archiver.

func (*Archiver) Push

func (a *Archiver) Push(displayName string, source isolatedclient.Source, priority int64) *PendingItem

Push schedules item upload to the isolate server. Smaller priority value means earlier processing.

func (*Archiver) PushFile

func (a *Archiver) PushFile(displayName, path string, priority int64) *PendingItem

PushFile schedules file upload to the isolate server. Smaller priority value means earlier processing.

func (*Archiver) Stats

func (a *Archiver) Stats() *Stats

Stats returns a copy of the statistics.

type PendingItem

type PendingItem struct {
	// Immutable.
	DisplayName string // Name to use to qualify this item
	// contains filtered or unexported fields
}

PendingItem is an item being processed.

It is caried over from pipeline stage to stage to do processing on it.

func (*PendingItem) Digest

func (i *PendingItem) Digest() isolated.HexDigest

Digest returns the calculated digest once calculated, empty otherwise.

func (*PendingItem) Error

func (i *PendingItem) Error() error

Error returns any error that occurred for this item if any.

func (*PendingItem) SetErr

func (i *PendingItem) SetErr(err error)

SetErr forcibly set an item as failed. Normally not used by callers.

func (*PendingItem) WaitForHashed

func (i *PendingItem) WaitForHashed()

WaitForHashed hangs until the item hash is known.

type PushedDirectoryItem

type PushedDirectoryItem struct {
	// Absolute path to the item.
	FullPath string

	// Relativie path to the item within the directory.
	RelPath string

	// FileInfo of the item.
	Info os.FileInfo
}

PushedDirectoryItem represents a file within the directory being pushed.

type Stats

type Stats struct {
	Hits   []units.Size  // Bytes; each item is immutable.
	Pushed []*UploadStat // Misses; each item is immutable.
}

Stats is statistics from the Archiver.

func (*Stats) PackedHits

func (s *Stats) PackedHits() ([]byte, error)

PackedHits returns the size of hit items in packed format.

func (*Stats) PackedMisses

func (s *Stats) PackedMisses() ([]byte, error)

PackedMisses returns size of missed items in packed format.

func (*Stats) TotalBytesHits

func (s *Stats) TotalBytesHits() units.Size

TotalBytesHits is the number of bytes not uploaded due to cache hits on the server.

func (*Stats) TotalBytesPushed

func (s *Stats) TotalBytesPushed() units.Size

TotalBytesPushed returns the sum of bytes uploaded.

func (*Stats) TotalHits

func (s *Stats) TotalHits() int

TotalHits is the number of cache hits on the server.

func (*Stats) TotalMisses

func (s *Stats) TotalMisses() int

TotalMisses returns the number of cache misses on the server.

type UploadStat

type UploadStat struct {
	Duration time.Duration
	Size     units.Size
	Name     string
}

UploadStat is the statistic for a single upload.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL