Documentation ¶
Index ¶
- Constants
- func CacheProbe(key string) bool
- func ClearObjStore() (err kv.Error)
- func ExtractMergeDoc(x1, x2 interface{}) (results string, err kv.Error)
- func GoGetConst(dir string, constName string) (v [][]string, err kv.Error)
- func InitObjStore(ctx context.Context, backing string, size int64, removedC chan os.FileInfo, ...) (triggerC chan<- struct{}, err kv.Error)
- func IsInTest() (isTest bool)
- func JSONEditor(oldDoc string, directives []string) (result string, err kv.Error)
- func MergeExperiment(x1, x2 interface{}) (interface{}, kv.Error)
- func NewLocalStorage() (s *localStorage, err kv.Error)
- func NewObjStore(ctx context.Context, spec *StoreOpts, errorC chan kv.Error) (oStore *objStore, err kv.Error)
- func NewPrometheusClient(url string) (cli *prometheusClient)
- func ObjStoreFootPrint() (max int64)
- func ParseBytes(val string) (bytes uint64, err error)
- func PingRMQServer(amqpURL string, amqpMgtURL string) (err kv.Error)
- func Reverse(in string) (reversed string)
- type ArtifactCache
- func (cache *ArtifactCache) Close()
- func (cache *ArtifactCache) Fetch(ctx context.Context, art *request.Artifact, projectId string, group string, ...) (size int64, warns []kv.Error, err kv.Error)
- func (cache *ArtifactCache) Hash(ctx context.Context, art *request.Artifact, projectId string, group string, ...) (hash string, err kv.Error)
- func (cache *ArtifactCache) Local(group string, dir string, file string) (fn string, err kv.Error)
- func (cache *ArtifactCache) Restore(ctx context.Context, art *request.Artifact, projectId string, group string, ...) (uploaded bool, warns []kv.Error, err kv.Error)
- type Backoffs
- type LocalQueue
- func (fq *LocalQueue) Exists(ctx context.Context, subscription string) (exists bool, err kv.Error)
- func (fq *LocalQueue) Get(subscription string) (Msg []byte, MsgID string, err kv.Error)
- func (fq *LocalQueue) GetKnown(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (found map[string]task.QueueDesc, err kv.Error)
- func (fq *LocalQueue) GetShortQName(qt *task.QueueTask) (shortName string, err kv.Error)
- func (fq *LocalQueue) HasWork(ctx context.Context, subscription string) (hasWork bool, err kv.Error)
- func (fq *LocalQueue) IsEncrypted() (encrypted bool)
- func (fq *LocalQueue) Publish(queueName string, contentType string, msg []byte) (err kv.Error)
- func (fq *LocalQueue) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)
- func (fq *LocalQueue) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan *runnerReports.Report, err kv.Error)
- func (fq *LocalQueue) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)
- type RabbitMQ
- func (rmq *RabbitMQ) AttachMgmt(timeout time.Duration) (mgmt *rh.Client, err kv.Error)
- func (rmq *RabbitMQ) Exists(ctx context.Context, subscription string) (exists bool, err kv.Error)
- func (rmq *RabbitMQ) GetKnown(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (found map[string]task.QueueDesc, err kv.Error)
- func (rmq *RabbitMQ) GetShortQName(qt *task.QueueTask) (shortName string, err kv.Error)
- func (rmq *RabbitMQ) HasWork(ctx context.Context, subscription string) (hasWork bool, err kv.Error)
- func (rmq *RabbitMQ) IsEncrypted() (encrypted bool)
- func (rmq *RabbitMQ) Publish(key string, contentType string, msg []byte) (err kv.Error)
- func (rmq *RabbitMQ) QueueDeclare(qName string) (err kv.Error)
- func (rmq *RabbitMQ) QueueDestroy(qName string) (err kv.Error)
- func (rmq *RabbitMQ) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)
- func (rmq *RabbitMQ) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan *runnerReports.Report, err kv.Error)
- func (rmq *RabbitMQ) URL() (urlString string)
- func (rmq *RabbitMQ) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)
- type Singularity
- type Storage
- type StoreOpts
- type TimeEMA
- type Trigger
- type VirtualEnv
Constants ¶
const DefaultStudioRMQExchange = "StudioML.topic"
DefaultStudioRMQExchange is the topic name used within RabbitMQ for StudioML based message queuing
Variables ¶
This section is empty.
Functions ¶
func CacheProbe ¶
CacheProbe can be used to test the validity of the cache for a previously cached item.
func ClearObjStore ¶
ClearObjStore can be used by clients to erase the contents of the object store cache
func ExtractMergeDoc ¶
ExtractMergeDoc uses two JSON-marshalable values x1 and x2 performing a merge and returns the results
func GoGetConst ¶
GoGetConst will retrieve data structures from source code within the code directories that can contain useful information to utilities visiting the code for testing purposes. It is used mainly to retrieve command line parameters used during testing that packages contain so that when tests are run by external application neutral software the code under test can parameterize itself.
func InitObjStore ¶
func InitObjStore(ctx context.Context, backing string, size int64, removedC chan os.FileInfo, errorC chan kv.Error) (triggerC chan<- struct{}, err kv.Error)
InitObjStore sets up the backing store for our object store cache. The size specified can be any byte amount.
The triggerC channel is functional when the err value is nil, this channel can be used to manually trigger the disk caching sub system
func IsInTest ¶
func IsInTest() (isTest bool)
IsInTest will examine the OS arguments passed into the software being run to detect if the go test framework is present. If varies from the cudaInTest approach in that it will work if the tests were enabled in another module
func JSONEditor ¶
JSONEditor will accept a source JSON document and an array of change edits for the source document and will process them as either RFC7386, or RFC6902 edits if they validate as either.
func MergeExperiment ¶
MergeExperiment merges the two JSON-marshalable values x1 and x2, preferring x1 over x2 except where x1 and x2 are JSON objects, in which case the keys from both objects are included and their values merged recursively.
It returns an error if x1 or x2 cannot be JSON-marshaled.
func NewLocalStorage ¶
NewLocalStorage is used to allocate and initialize a struct that acts as a receiver
func NewObjStore ¶
func NewObjStore(ctx context.Context, spec *StoreOpts, errorC chan kv.Error) (oStore *objStore, err kv.Error)
NewObjStore is used to instantiate an object store for the running that includes a cache
func NewPrometheusClient ¶
func NewPrometheusClient(url string) (cli *prometheusClient)
NewPrometheusClient will instantiate the structure used to communicate with a remote prometheus endpoint
func ObjStoreFootPrint ¶
func ObjStoreFootPrint() (max int64)
ObjStoreFootPrint can be used to determine what the cxurrent footprint of the artifact cache is
func ParseBytes ¶
ParseBytes returns a value for the input string.
This function uses the humanize library from github for go.
Typical inputs can include by way of examples '6gb', '6 GB', '6 GiB'. Inputs support SI and IEC sizes. For more information please review https://github.com/dustin/go-humanize/blob/master/bytes.go
func PingRMQServer ¶
PingRMQServer is used to validate the a RabbitMQ server is alive and active on the administration port.
amqpURL is the standard client amqp uri supplied by a caller. amqpURL will be parsed and converted into the administration endpoint and then tested.
Types ¶
type ArtifactCache ¶
type ArtifactCache struct { sync.Mutex // This can be used by the application layer to receive diagnostic and other information // about kv.occurring inside the caching tracker etc and surface these kv.etc to // the logging system ErrorC chan kv.Error // contains filtered or unexported fields }
ArtifactCache is used to encapsulate and store hashes, typically file hashes, and prevent duplicated uploads from occurring needlessly
func NewArtifactCache ¶
func NewArtifactCache() (cache *ArtifactCache)
NewArtifactCache initializes an hash tracker for artifact related files and passes it back to the caller. The tracking structure can be used to track files that already been downloaded / uploaded and also includes a channel that can be used to receive error notifications
func (*ArtifactCache) Close ¶
func (cache *ArtifactCache) Close()
Close will clean up the cache of hashes and close the error reporting channel associated with the cache tracker
func (*ArtifactCache) Fetch ¶
func (cache *ArtifactCache) Fetch(ctx context.Context, art *request.Artifact, projectId string, group string, maxBytes int64, env map[string]string, dir string) (size int64, warns []kv.Error, err kv.Error)
Fetch can be used to retrieve an artifact from a storage layer implementation, while passing through the lens of a caching filter that prevents unneeded downloads.
func (*ArtifactCache) Hash ¶
func (cache *ArtifactCache) Hash(ctx context.Context, art *request.Artifact, projectId string, group string, env map[string]string, dir string) (hash string, err kv.Error)
Hash is used to obtain the hash of an artifact from the backing store implementation being used by the storage implementation
func (*ArtifactCache) Local ¶
Local returns the local disk based file name for the artifacts expanded archive files
func (*ArtifactCache) Restore ¶
func (cache *ArtifactCache) Restore(ctx context.Context, art *request.Artifact, projectId string, group string, env map[string]string, dir string) (uploaded bool, warns []kv.Error, err kv.Error)
Restore the artifacts that have been marked mutable and that have changed
type Backoffs ¶
type Backoffs struct {
// contains filtered or unexported fields
}
Backoffs uses a cache with TTL on the cache items to maintain a set of blocking directive for resources, where the cache expiry time is the applicable time for the blocker
func GetBackoffs ¶
func GetBackoffs() (b *Backoffs)
GetBackoffs retrieves a reference to a singleton of the Backoffs structure
type LocalQueue ¶
type LocalQueue struct { RootDir string // full file path to root queues "server" directory // contains filtered or unexported fields }
LocalQueue "project" is basically a local root directory containing queues sub-directories.
func NewLocalQueue ¶
func (*LocalQueue) Exists ¶
Exists will check that file queue named "subscription" does exist as sub-directory under root "server" directory.
func (*LocalQueue) GetShortQName ¶
GetShortQName GetShortQueueName is useful for storing queue specific information in collections etc
func (*LocalQueue) HasWork ¶
func (fq *LocalQueue) HasWork(ctx context.Context, subscription string) (hasWork bool, err kv.Error)
HasWork will look at the local file queue to see if there is any pending work. The function is called in an attempt to see if there is any point in processing new work without a lot of overhead.
func (*LocalQueue) IsEncrypted ¶
func (fq *LocalQueue) IsEncrypted() (encrypted bool)
func (*LocalQueue) Refresh ¶
func (fq *LocalQueue) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)
Refresh will examine the local file queues "server" and extract a list of the queues that relate to StudioML work.
func (*LocalQueue) Responder ¶
func (fq *LocalQueue) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan *runnerReports.Report, err kv.Error)
Responder is used to open a connection to an existing response queue if one was made available and also to provision a channel into which the runner can place report messages
func (*LocalQueue) Work ¶
func (fq *LocalQueue) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)
Work will connect to the FileQueue "server" identified in the receiver, fq, and will see if any work can be found on the queue identified by the go runner subscription and present work to the handler for processing
type RabbitMQ ¶
type RabbitMQ struct { Identity string // A URL stripped of the user name and password, making it safe for logging etc // contains filtered or unexported fields }
RabbitMQ encapsulated the configuration and extant extant client for a queue server
func NewRabbitMQ ¶
func NewRabbitMQ(queueURI string, manageURI string, creds string, w wrapper.Wrapper, logger *log.Logger) (rmq *RabbitMQ, err kv.Error)
NewRabbitMQ takes the uri identifing a server and will configure the client data structure needed to call methods against the server
The order of these two parameters needs to reflect key, value pair that the GetKnown function returns
func (*RabbitMQ) AttachMgmt ¶
func (*RabbitMQ) Exists ¶
Exists will connect to the rabbitMQ server identified in the receiver, rmq, and will query it to see if the queue identified by the studio go runner subscription exists
func (*RabbitMQ) GetKnown ¶
func (rmq *RabbitMQ) GetKnown(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (found map[string]task.QueueDesc, err kv.Error)
GetKnown will connect to the rabbitMQ server identified in the receiver, rmq, and will query it for any queues that match the matcher regular expression
found contains a map of keys that have an uncredentialed URL, and the value which is the user name and password for the URL
The URL path is going to be the vhost and the queue name
func (*RabbitMQ) GetShortQName ¶
GetShortQueueName is useful for storing queue specific information in collections etc
func (*RabbitMQ) HasWork ¶
HasWork will look at the SQS queue to see if there is any pending work. The function is called in an attempt to see if there is any point in processing new work without a lot of overhead. In the case of RabbitMQ at the moment we always assume there is work.
func (*RabbitMQ) IsEncrypted ¶
func (*RabbitMQ) Publish ¶
Publish is a shim method for tests to use for sending requeues to a queue
func (*RabbitMQ) QueueDeclare ¶
QueueDeclare is a shim method for creating a queue within the rabbitMQ server defined by the receiver
func (*RabbitMQ) QueueDestroy ¶
QueueDestroy is a shim method for creating a queue within the rabbitMQ server defined by the receiver
func (*RabbitMQ) Refresh ¶
func (rmq *RabbitMQ) Refresh(ctx context.Context, matcher *regexp.Regexp, mismatcher *regexp.Regexp) (known map[string]interface{}, err kv.Error)
Refresh will examine the RMQ exchange a extract a list of the queues that relate to StudioML work from the rmq exchange.
func (*RabbitMQ) Responder ¶
func (rmq *RabbitMQ) Responder(ctx context.Context, subscription string, encryptKey *rsa.PublicKey) (sender chan *runnerReports.Report, err kv.Error)
Responder is used to open a connection to an existing response queue if one was made available and also to provision a channel into which the runner can place report messages
func (*RabbitMQ) Work ¶
func (rmq *RabbitMQ) Work(ctx context.Context, qt *task.QueueTask) (msgProcessed bool, resource *server.Resource, err kv.Error)
Work will connect to the rabbitMQ server identified in the receiver, rmq, and will see if any work can be found on the queue identified by the go runner subscription and present work to the handler for processing
type Singularity ¶
Singularity is a data structure that contains the description of a singularity container resource
func NewSingularity ¶
NewSingularity is used to instantiate a singularity resource based upon a request, typically sent across a go channel or similar
func (*Singularity) Close ¶
func (*Singularity) Close() (err kv.Error)
Close is a stub method for termination of a singularity resource
func (*Singularity) Make ¶
func (s *Singularity) Make(alloc *resources.Allocated, e interface{}) (err kv.Error)
Make is used to write a script file that is generated for the specific TF tasks studioml has sent to retrieve any python packages etc then to run the task
type Storage ¶
type Storage interface { // Gather will retrieve contents of the named storage object using a prefix treating any items retrieved as individual files, invokes Fetch // Gather(ctx context.Context, keyPrefix string, outputDir string, maxBytes int64, tap io.Writer, failFast bool) (size int64, warnings []kv.Error, err kv.Error) // Fetch will retrieve contents of the named storage object and optionally unpack it into the // user specified output directory // Fetch(ctx context.Context, name string, unpack bool, output string, maxBytes int64, tap io.Writer) (size int64, warnings []kv.Error, err kv.Error) // Hoard will take a number of files for upload, deduplication is implemented outside of this interface // Hoard(ctx context.Context, srcDir string, keyPrefix string) (warnings []kv.Error, err kv.Error) // Deposit is a directory archive and upload, deduplication is implemented outside of this interface // Deposit(ctx context.Context, src string, dest string) (warnings []kv.Error, err kv.Error) // Hash can be used to retrieve the hash of the contents of the file. The hash is // retrieved not computed and so is a lightweight operation common to both S3 and Google Storage. // The hash on some storage platforms is not a plain MD5 but uses multiple hashes from file // segments to increase the speed of hashing and also to reflect the multipart download // processing that was used for the file, for a full explanation please see // https://stackoverflow.com/questions/12186993/what-is-the-algorithm-to-compute-the-amazon-s3-etag-for-a-file-larger-than-5gb // Hash(ctx context.Context, name string) (hash string, err kv.Error) Close() }
Storage defines an interface for implementations of a studioml artifact store
type StoreOpts ¶
type StoreOpts struct { Art *request.Artifact ProjectID string Group string Env map[string]string Validate bool }
StoreOpts is used to encapsulate a storage implementation with the runner and studioml data needed
type TimeEMA ¶
TimeEMA is used to store exponential moving averages for a time duration
func NewTimeEMA ¶
NewTimeEMA creates a new exponential moving average of a time duration for a set of time windows with an initial execution time duration set
func (*TimeEMA) Get ¶
Get retrieves a single time duration moving average for a specified window of time
type Trigger ¶
type Trigger struct { T <-chan struct{} C chan time.Time // contains filtered or unexported fields }
Trigger is a data structure that encapsulates a timer and a channel which together are used to in turn to send messages to a downstream go channel. The main Trigger use case is to allow a regular action to be scheduled via a timer and also to allow unit tests for example to activate the same action.
func NewTrigger ¶
NewTrigger accepts a timer and a channel that together can be used to send messages into a channel that is encapsulated within the returned t data structure
type VirtualEnv ¶
type VirtualEnv struct { Request *request.Request Script string ResponseQ chan<- *runnerReports.Report // contains filtered or unexported fields }
VirtualEnv encapsulated the context that a python virtual environment is to be instantiated from including items such as the list of pip installables that should be loaded and shell script to run.
func NewVirtualEnv ¶
func NewVirtualEnv(rqst *request.Request, dir string, uniqueID string, responseQ chan<- *runnerReports.Report) (env *VirtualEnv, err kv.Error)
NewVirtualEnv builds the VirtualEnv data structure from data received across the wire from a studioml client.
func (*VirtualEnv) Close ¶
func (*VirtualEnv) Close() (err kv.Error)
Close is used to close any resources which the encapsulated VirtualEnv may have consumed.
func (*VirtualEnv) Make ¶
func (p *VirtualEnv) Make(alloc *resources.Allocated, e interface{}) (err kv.Error)
Make is used to write a script file that is generated for the specific TF tasks studioml has sent to retrieve any python packages etc then to run the task
func (*VirtualEnv) Run ¶
Run will use a generated script file and will run it to completion while marshalling results and files from the computation. Run is a blocking call and will only return upon completion or termination of the process it starts. Run is called by the processor runScript receiver.