runner

package
v0.0.0-...-598a827 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2021 License: Apache-2.0 Imports: 60 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultStudioRMQExchange = "StudioML.topic"

DefaultStudioRMQExchange is the topic name used within RabbitMQ for StudioML based message queuing

Variables

This section is empty.

Functions

func CacheProbe

func CacheProbe(key string) bool

CacheProbe can be used to test the validity of the cache for a previously cached item.

func ClearObjStore

func ClearObjStore() (err kv.Error)

ClearObjStore can be used by clients to erase the contents of the object store cache

func ExtractMergeDoc

func ExtractMergeDoc(x1, x2 interface{}) (results string, err kv.Error)

ExtractMergeDoc uses two JSON-marshalable values x1 and x2 performing a merge and returns the results

func GoGetConst

func GoGetConst(dir string, constName string) (v [][]string, err kv.Error)

GoGetConst will retrieve data structures from source code within the code directories that can contain useful information to utilities visiting the code for testing purposes. It is used mainly to retrieve command line parameters used during testing that packages contain so that when tests are run by external application neutral software the code under test can parameterize itself.

func InitObjStore

func InitObjStore(ctx context.Context, backing string, size int64, removedC chan os.FileInfo, errorC chan kv.Error) (triggerC chan<- struct{}, err kv.Error)

InitObjStore sets up the backing store for our object store cache. The size specified can be any byte amount.

The triggerC channel is functional when the err value is nil, this channel can be used to manually trigger the disk caching sub system

func IsInTest

func IsInTest() (isTest bool)

IsInTest will examine the OS arguments passed into the software being run to detect if the go test framework is present. If varies from the cudaInTest approach in that it will work if the tests were enabled in another module

func JSONEditor

func JSONEditor(oldDoc string, directives []string) (result string, err kv.Error)

JSONEditor will accept a source JSON document and an array of change edits for the source document and will process them as either RFC7386, or RFC6902 edits if they validate as either.

func MergeExperiment

func MergeExperiment(x1, x2 interface{}) (interface{}, kv.Error)

MergeExperiment merges the two JSON-marshalable values x1 and x2, preferring x1 over x2 except where x1 and x2 are JSON objects, in which case the keys from both objects are included and their values merged recursively.

It returns an error if x1 or x2 cannot be JSON-marshaled.

func NewLocalStorage

func NewLocalStorage() (s *localStorage, err kv.Error)

NewLocalStorage is used to allocate and initialize a struct that acts as a receiver

func NewObjStore

func NewObjStore(ctx context.Context, spec *StoreOpts, errorC chan kv.Error) (oStore *objStore, err kv.Error)

NewObjStore is used to instantiate an object store for the running that includes a cache

func NewPrometheusClient

func NewPrometheusClient(url string) (cli *prometheusClient)

NewPrometheusClient will instantiate the structure used to communicate with a remote prometheus endpoint

func ObjStoreFootPrint

func ObjStoreFootPrint() (max int64)

ObjStoreFootPrint can be used to determine what the cxurrent footprint of the artifact cache is

func ParseBytes

func ParseBytes(val string) (bytes uint64, err error)

ParseBytes returns a value for the input string.

This function uses the humanize library from github for go.

Typical inputs can include by way of examples '6gb', '6 GB', '6 GiB'. Inputs support SI and IEC sizes. For more information please review https://github.com/dustin/go-humanize/blob/master/bytes.go

func PingRMQServer

func PingRMQServer(amqpURL string, amqpMgtURL string) (err kv.Error)

PingRMQServer is used to validate the a RabbitMQ server is alive and active on the administration port.

amqpURL is the standard client amqp uri supplied by a caller. amqpURL will be parsed and converted into the administration endpoint and then tested.

func Reverse

func Reverse(in string) (reversed string)

Types

type ArtifactCache

type ArtifactCache struct {
	sync.Mutex

	// This can be used by the application layer to receive diagnostic and other information
	// about kv.occurring inside the caching tracker etc and surface these kv.etc to
	// the logging system
	ErrorC chan kv.Error
	// contains filtered or unexported fields
}

ArtifactCache is used to encapsulate and store hashes, typically file hashes, and prevent duplicated uploads from occurring needlessly

func NewArtifactCache

func NewArtifactCache() (cache *ArtifactCache)

NewArtifactCache initializes an hash tracker for artifact related files and passes it back to the caller. The tracking structure can be used to track files that already been downloaded / uploaded and also includes a channel that can be used to receive error notifications

func (*ArtifactCache) Close

func (cache *ArtifactCache) Close()

Close will clean up the cache of hashes and close the error reporting channel associated with the cache tracker

func (*ArtifactCache) Fetch

func (cache *ArtifactCache) Fetch(ctx context.Context, art *request.Artifact, projectId string, group string, maxBytes int64, env map[string]string, dir string) (size int64, warns []kv.Error, err kv.Error)

Fetch can be used to retrieve an artifact from a storage layer implementation, while passing through the lens of a caching filter that prevents unneeded downloads.

func (*ArtifactCache) Hash

func (cache *ArtifactCache) Hash(ctx context.Context, art *request.Artifact, projectId string, group string, env map[string]string, dir string) (hash string, err kv.Error)

Hash is used to obtain the hash of an artifact from the backing store implementation being used by the storage implementation

func (*ArtifactCache) Local

func (cache *ArtifactCache) Local(group string, dir string, file string) (fn string, err kv.Error)

Local returns the local disk based file name for the artifacts expanded archive files

func (*ArtifactCache) Restore

func (cache *ArtifactCache) Restore(ctx context.Context, art *request.Artifact, projectId string, group string, env map[string]string, dir string) (uploaded bool, warns []kv.Error, err kv.Error)

Restore the artifacts that have been marked mutable and that have changed

type Backoffs

type Backoffs struct {
	// contains filtered or unexported fields
}

Backoffs uses a cache with TTL on the cache items to maintain a set of blocking directive for resources, where the cache expiry time is the applicable time for the blocker

func GetBackoffs

func GetBackoffs() (b *Backoffs)

GetBackoffs retrieves a reference to a singleton of the Backoffs structure

func (*Backoffs) Get

func (b *Backoffs) Get(k string) (expires time.Time, isPresent bool)

Get retrieves a blockers current expiry time if one exists for the named resource

func (*Backoffs) Set

func (b *Backoffs) Set(k string, d time.Duration)

Set will add a blocker for a named resource, but only if there is no blocking timer already in effect for the resource

type LocalQueue

type LocalQueue struct {
	RootDir string // full file path to root queues "server" directory
	// contains filtered or unexported fields
}

LocalQueue "project" is basically a local root directory containing queues sub-directories.

func NewLocalQueue

func NewLocalQueue(root string, w wrapper.Wrapper, logger *log.Logger) (fq *LocalQueue)

func (*LocalQueue) Exists

func (fq *LocalQueue) Exists(ctx context.Context, subscription string) (exists bool, err kv.Error)

Exists will check that file queue named "subscription" does exist as sub-directory under root "server" directory.

func (*LocalQueue) Get

func (fq *LocalQueue) Get(subscription string) (Msg []byte, MsgID string, err kv.Error)

func (*LocalQueue) GetKnown

func (fq *LocalQueue) GetKnown(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (found map[string]task.QueueDesc, err kv.Error)

func (*LocalQueue) GetShortQName

func (fq *LocalQueue) GetShortQName(qt *task.QueueTask) (shortName string, err kv.Error)

GetShortQName GetShortQueueName is useful for storing queue specific information in collections etc

func (*LocalQueue) HasWork

func (fq *LocalQueue) HasWork(ctx context.Context, subscription string) (hasWork bool, err kv.Error)

HasWork will look at the local file queue to see if there is any pending work. The function is called in an attempt to see if there is any point in processing new work without a lot of overhead.

func (*LocalQueue) IsEncrypted

func (fq *LocalQueue) IsEncrypted() (encrypted bool)

func (*LocalQueue) Publish

func (fq *LocalQueue) Publish(queueName string, contentType string, msg []byte) (err kv.Error)

func (*LocalQueue) Refresh

func (fq *LocalQueue) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)

Refresh will examine the local file queues "server" and extract a list of the queues that relate to StudioML work.

func (*LocalQueue) Responder

func (fq *LocalQueue) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan *runnerReports.Report, err kv.Error)

Responder is used to open a connection to an existing response queue if one was made available and also to provision a channel into which the runner can place report messages

func (*LocalQueue) Work

func (fq *LocalQueue) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)

Work will connect to the FileQueue "server" identified in the receiver, fq, and will see if any work can be found on the queue identified by the go runner subscription and present work to the handler for processing

type RabbitMQ

type RabbitMQ struct {
	Identity string // A URL stripped of the user name and password, making it safe for logging etc
	// contains filtered or unexported fields
}

RabbitMQ encapsulated the configuration and extant extant client for a queue server

func NewRabbitMQ

func NewRabbitMQ(queueURI string, manageURI string, creds string, w wrapper.Wrapper, logger *log.Logger) (rmq *RabbitMQ, err kv.Error)

NewRabbitMQ takes the uri identifing a server and will configure the client data structure needed to call methods against the server

The order of these two parameters needs to reflect key, value pair that the GetKnown function returns

func (*RabbitMQ) AttachMgmt

func (rmq *RabbitMQ) AttachMgmt(timeout time.Duration) (mgmt *rh.Client, err kv.Error)

func (*RabbitMQ) Exists

func (rmq *RabbitMQ) Exists(ctx context.Context, subscription string) (exists bool, err kv.Error)

Exists will connect to the rabbitMQ server identified in the receiver, rmq, and will query it to see if the queue identified by the studio go runner subscription exists

func (*RabbitMQ) GetKnown

func (rmq *RabbitMQ) GetKnown(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (found map[string]task.QueueDesc, err kv.Error)

GetKnown will connect to the rabbitMQ server identified in the receiver, rmq, and will query it for any queues that match the matcher regular expression

found contains a map of keys that have an uncredentialed URL, and the value which is the user name and password for the URL

The URL path is going to be the vhost and the queue name

func (*RabbitMQ) GetShortQName

func (rmq *RabbitMQ) GetShortQName(qt *task.QueueTask) (shortName string, err kv.Error)

GetShortQueueName is useful for storing queue specific information in collections etc

func (*RabbitMQ) HasWork

func (rmq *RabbitMQ) HasWork(ctx context.Context, subscription string) (hasWork bool, err kv.Error)

HasWork will look at the SQS queue to see if there is any pending work. The function is called in an attempt to see if there is any point in processing new work without a lot of overhead. In the case of RabbitMQ at the moment we always assume there is work.

func (*RabbitMQ) IsEncrypted

func (rmq *RabbitMQ) IsEncrypted() (encrypted bool)

func (*RabbitMQ) Publish

func (rmq *RabbitMQ) Publish(key string, contentType string, msg []byte) (err kv.Error)

Publish is a shim method for tests to use for sending requeues to a queue

func (*RabbitMQ) QueueDeclare

func (rmq *RabbitMQ) QueueDeclare(qName string) (err kv.Error)

QueueDeclare is a shim method for creating a queue within the rabbitMQ server defined by the receiver

func (*RabbitMQ) QueueDestroy

func (rmq *RabbitMQ) QueueDestroy(qName string) (err kv.Error)

QueueDestroy is a shim method for creating a queue within the rabbitMQ server defined by the receiver

func (*RabbitMQ) Refresh

func (rmq *RabbitMQ) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)

Refresh will examine the RMQ exchange a extract a list of the queues that relate to StudioML work from the rmq exchange.

func (*RabbitMQ) Responder

func (rmq *RabbitMQ) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan *runnerReports.Report, err kv.Error)

Responder is used to open a connection to an existing response queue if one was made available and also to provision a channel into which the runner can place report messages

func (*RabbitMQ) URL

func (rmq *RabbitMQ) URL() (urlString string)

func (*RabbitMQ) Work

func (rmq *RabbitMQ) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)

Work will connect to the rabbitMQ server identified in the receiver, rmq, and will see if any work can be found on the queue identified by the go runner subscription and present work to the handler for processing

type Singularity

type Singularity struct {
	Request   *request.Request
	BaseDir   string
	BaseImage string
}

Singularity is a data structure that contains the description of a singularity container resource

func NewSingularity

func NewSingularity(rqst *request.Request, dir string) (sing *Singularity, err kv.Error)

NewSingularity is used to instantiate a singularity resource based upon a request, typically sent across a go channel or similar

func (*Singularity) Close

func (*Singularity) Close() (err kv.Error)

Close is a stub method for termination of a singularity resource

func (*Singularity) Make

func (s *Singularity) Make(alloc *resources.Allocated, e interface{}) (err kv.Error)

Make is used to write a script file that is generated for the specific TF tasks studioml has sent to retrieve any python packages etc then to run the task

func (*Singularity) Run

func (s *Singularity) Run(ctx context.Context, refresh map[string]request.Artifact) (err kv.Error)

Run will use a generated script file and will run it to completion while marshalling results and files from the computation. Run is a blocking call and will only return upon completion or termination of the process it starts

type Storage

type Storage interface {
	// Gather will retrieve contents of the named storage object using a prefix treating any items retrieved as individual files, invokes Fetch
	//
	Gather(ctx context.Context, keyPrefix string, outputDir string, maxBytes int64, tap io.Writer, failFast bool) (size int64, warnings []kv.Error, err kv.Error)

	// Fetch will retrieve contents of the named storage object and optionally unpack it into the
	// user specified output directory
	//
	Fetch(ctx context.Context, name string, unpack bool, output string, maxBytes int64, tap io.Writer) (size int64, warnings []kv.Error, err kv.Error)

	// Hoard will take a number of files for upload, deduplication is implemented outside of this interface
	//
	Hoard(ctx context.Context, srcDir string, keyPrefix string) (warnings []kv.Error, err kv.Error)

	// Deposit is a directory archive and upload, deduplication is implemented outside of this interface
	//
	Deposit(ctx context.Context, src string, dest string) (warnings []kv.Error, err kv.Error)

	// Hash can be used to retrieve the hash of the contents of the file.  The hash is
	// retrieved not computed and so is a lightweight operation common to both S3 and Google Storage.
	// The hash on some storage platforms is not a plain MD5 but uses multiple hashes from file
	// segments to increase the speed of hashing and also to reflect the multipart download
	// processing that was used for the file, for a full explanation please see
	// https://stackoverflow.com/questions/12186993/what-is-the-algorithm-to-compute-the-amazon-s3-etag-for-a-file-larger-than-5gb
	//
	Hash(ctx context.Context, name string) (hash string, err kv.Error)

	Close()
}

Storage defines an interface for implementations of a studioml artifact store

func NewStorage

func NewStorage(ctx context.Context, spec *StoreOpts) (stor Storage, err kv.Error)

NewStorage is used to create a receiver for a storage implementation

type StoreOpts

type StoreOpts struct {
	Art       *request.Artifact
	ProjectID string
	Group     string
	Env       map[string]string
	Validate  bool
}

StoreOpts is used to encapsulate a storage implementation with the runner and studioml data needed

type TimeEMA

type TimeEMA struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TimeEMA is used to store exponential moving averages for a time duration

func NewTimeEMA

func NewTimeEMA(windows []time.Duration, initial time.Duration) (emas *TimeEMA)

NewTimeEMA creates a new exponential moving average of a time duration for a set of time windows with an initial execution time duration set

func (*TimeEMA) Get

func (avgs *TimeEMA) Get(window time.Duration) (avg time.Duration, wasPresent bool)

Get retrieves a single time duration moving average for a specified window of time

func (*TimeEMA) Keys

func (avgs *TimeEMA) Keys() (keys []time.Duration)

Keys can be used to retrieve a list of the moving average periods across which the average is calculated

func (*TimeEMA) Update

func (avgs *TimeEMA) Update(value time.Duration)

Update is used to update the moving average based on a new duration that has been observed

type Trigger

type Trigger struct {
	T <-chan struct{}
	C chan time.Time
	// contains filtered or unexported fields
}

Trigger is a data structure that encapsulates a timer and a channel which together are used to in turn to send messages to a downstream go channel. The main Trigger use case is to allow a regular action to be scheduled via a timer and also to allow unit tests for example to activate the same action.

func NewTrigger

func NewTrigger(triggerC <-chan struct{}, d time.Duration, j jitterbug.Jitter) (t *Trigger)

NewTrigger accepts a timer and a channel that together can be used to send messages into a channel that is encapsulated within the returned t data structure

func (*Trigger) Stop

func (t *Trigger) Stop()

Stop will close the internal channel used to signal termination to the internally running go routine that processes the timer and the trigger chanel

type VirtualEnv

type VirtualEnv struct {
	Request *request.Request
	Script  string

	ResponseQ chan<- *runnerReports.Report
	// contains filtered or unexported fields
}

VirtualEnv encapsulated the context that a python virtual environment is to be instantiated from including items such as the list of pip installables that should be loaded and shell script to run.

func NewVirtualEnv

func NewVirtualEnv(rqst *request.Request, dir string, uniqueID string, responseQ chan<- *runnerReports.Report) (env *VirtualEnv, err kv.Error)

NewVirtualEnv builds the VirtualEnv data structure from data received across the wire from a studioml client.

func (*VirtualEnv) Close

func (*VirtualEnv) Close() (err kv.Error)

Close is used to close any resources which the encapsulated VirtualEnv may have consumed.

func (*VirtualEnv) Make

func (p *VirtualEnv) Make(alloc *resources.Allocated, e interface{}) (err kv.Error)

Make is used to write a script file that is generated for the specific TF tasks studioml has sent to retrieve any python packages etc then to run the task

func (*VirtualEnv) Run

func (p *VirtualEnv) Run(ctx context.Context, refresh map[string]request.Artifact) (err kv.Error)

Run will use a generated script file and will run it to completion while marshalling results and files from the computation. Run is a blocking call and will only return upon completion or termination of the process it starts. Run is called by the processor runScript receiver.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL