mariner

package
v0.0.0-...-fe8ab6f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2021 License: Apache-2.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// cwl things //
	// parameter type
	CWLNullType      = "null"
	CWLFileType      = "File"
	CWLDirectoryType = "Directory"
	// object class
	CWLWorkflow        = "Workflow"
	CWLCommandLineTool = "CommandLineTool"
	CWLExpressionTool  = "ExpressionTool"
	// requirements
	CWLInitialWorkDirRequirement = "InitialWorkDirRequirement"
	CWLResourceRequirement       = "ResourceRequirement"
	CWLDockerRequirement         = "DockerRequirement"
	CWLEnvVarRequirement         = "EnvVarRequirement"
)

define any needed/useful vars and consts here

Variables

View Source
var Config = loadConfig("/mariner-config/mariner-config.json")

load in config from `mariner-config.json` which is a configmap object in the k8s cluster with name `mariner-config` NOTE: when moving stuff to cloud automation, ----- probably the config will be put in the manifest which holds the config for all the other services ----- and the configmap name might change to `manifest-mariner` ----- when this happens, need to update 1. mariner-config.json 2. mariner-deploy.yaml 3. engine job spec (DispatchWorkflowJob)

Functions

func Engine

func Engine(runID string) (err error)

Engine runs an instance of the mariner engine job

func RunServer

func RunServer()

RunServer inits the mariner server

Types

type AWSUserCreds

type AWSUserCreds struct {
	Name string `json:"name"`
	Key  string `json:"key"`
}

AWSUserCreds ..

type ArboristResponse

type ArboristResponse struct {
	Auth bool `json:"auth"`
}

type AuthAction

type AuthAction struct {
	Service string `json:"service"`
	Method  string `json:"method"`
}

type AuthHTTPRequest

type AuthHTTPRequest struct {
	URL         string
	ContentType string
	Body        io.Reader
}

type AuthRequest

type AuthRequest struct {
	Resource string      `json:"resource"`
	Action   *AuthAction `json:"action"`
}

type CancelRunJSON

type CancelRunJSON struct {
	RunID  string `json:"runID"`
	Result string `json:"result"` // success or failed
}

type CleanupByParam

type CleanupByParam map[string]*DeleteCondition

CleanupByParam exists per workflow step it's a collection of (ouputParamID, CleanupFlags) pairs

type CleanupByStep

type CleanupByStep map[string]CleanupByParam

CleanupByStep exists per workflow it's a collection of (stepID, CleanupByParam)

type CleanupKey

type CleanupKey struct {
	StepID string
	Param  string
}

CleanupKey uniquely identifies (within a workflow) a set of files to monitor/delete

type CommandElement

type CommandElement struct {
	Position    int      // position from binding
	ArgPosition int      // index from arguments list, if argument
	Value       []string // representation of this input/arg on the commandline (after any/all valueFrom, eval, prefix, separators, shellQuote, etc. has been resolved)
}

CommandElement represents an input/argument on the commandline for commandlinetools

type CommandElements

type CommandElements []*CommandElement

CommandElements is an array of CommandElements we define this type and methods for sort.Interface so these CommandElements can be sorted by position

func (CommandElements) Len

func (cmdElts CommandElements) Len() int

from first example at: https://golang.org/pkg/sort/

func (CommandElements) Less

func (cmdElts CommandElements) Less(i, j int) bool

func (CommandElements) Swap

func (cmdElts CommandElements) Swap(i, j int)

type Container

type Container struct {
	Name            string          `json:"name"`
	Image           string          `json:"image"`
	PullPolicy      string          `json:"pull_policy"`
	Command         []string        `json:"command"`
	Lifecycle       Lifecycle       `json:"lifecycle"`
	SecurityContext SecurityContext `json:"securitycontext"`
	Resources       Resources       `json:"resources"`
}

Container ..

type Containers

type Containers struct {
	Engine    Container `json:"engine"`
	S3sidecar Container `json:"s3sidecar"`
	Task      Container `json:"task"`
	Gen3fuse  Container `json:"gen3fusesidecar"`
}

Containers ..

type DeleteCondition

type DeleteCondition struct {
	WorkflowOutput bool            // this an output of the top-level workflow
	DependentSteps map[string]bool // static collection of steps which depend on using this output parameter
	Queue          *GoQueue        // each time a dependent step finishes, remove it from the Queue
}

DeleteCondition exists per output parameter we want to delete all intermediate files as soon as they become unnecessary to downstream processes the condition for deletion is: !WorkflowOutput && len(Queue) == 0

type EventLog

type EventLog struct {
	sync.RWMutex
	Events []string `json:"events,omitempty"`
}

EventLog is an event logger for a mariner component (i.e., engine or task)

type File

type File struct {
	Class          string  `json:"class"`          // always CWLFileType
	Location       string  `json:"location"`       // path to file (same as `path`)
	Path           string  `json:"path"`           // path to file
	Basename       string  `json:"basename"`       // last element of location path
	NameRoot       string  `json:"nameroot"`       // basename without file extension
	NameExt        string  `json:"nameext"`        // file extension of basename
	DirName        string  `json:"dirname"`        // name of directory containing the file
	Contents       string  `json:"contents"`       // first 64 KiB of file as a string, if loadContents is true
	SecondaryFiles []*File `json:"secondaryFiles"` // array of secondaryFiles

}

File type represents a CWL file object NOTE: the json representation of field names is what gets loaded into js vm ----- see PreProcessContext() and accompanying note of explanation. ----- these json aliases are the fieldnames defined by cwl for cwl File objects

see: see: https://www.commonwl.org/v1.0/Workflow.html#File

would be nice for logging to strip some of the redundant information e.g., only have Class, Path, Contents, and SecondaryFiles omitempty but can't do that JSON encoding directly here because these JSON encodings are used for context in parameters refs and JS expressions so again - CANNOT implement the stripped JSON marhsalling here --- would need some preprocessing step before writing/storing a file object to log --- could just create a wrapper around the File type, --- like FileLog or something, which implements the desired, stripped JSON encodings

type GoQueue

type GoQueue struct {
	sync.RWMutex
	Map map[string]bool
}

GoQueue is safe for concurrent read/write

type IndexFileInfo

type IndexFileInfo struct {
	Filename string   `json:"file_name"`
	Filesize uint64   `json:"size"`
	URLs     []string `json:"urls"`
}

Indexd File struct

type JWTDecoder

type JWTDecoder interface {
	Decode(string) (*map[string]interface{}, error)
}

type JobConfig

type JobConfig struct {
	Labels         map[string]string `json:"labels"`
	ServiceAccount string            `json:"serviceaccount"`
	RestartPolicy  string            `json:"restart_policy"`
}

JobConfig ..

type JobInfo

type JobInfo struct {
	UID    string `json:"uid"`
	Name   string `json:"name"`
	Status string `json:"status"`
}

JobInfo - k8s job information

type Jobs

type Jobs struct {
	Engine JobConfig `json:"engine"`
	Task   JobConfig `json:"task"`
}

Jobs ..

type K8sEngine

type K8sEngine struct {
	sync.RWMutex    `json:"-"`
	S3FileManager   *S3FileManager
	TaskSequence    []string            // for testing purposes
	UnfinishedProcs map[string]bool     // engine's stack of CLT's that are running; (task.Root.ID, Process) pairs
	FinishedProcs   map[string]bool     // engine's stack of completed processes; (task.Root.ID, Process) pairs
	CleanupProcs    map[CleanupKey]bool // engine's stack of running cleanup processes
	UserID          string              // the userID for the user who requested the workflow run
	RunID           string              // the workflow timestamp
	Manifest        *Manifest           // to pass the manifest to the gen3fuse container of each task pod
	Log             *MainLog            //
	KeepFiles       map[string]bool     // all the paths to not delete during basic file cleanup
}

K8sEngine runs all Tools, where a Tool is a CWL expressiontool or commandlinetool NOTE: engine object code store all the logs/event-monitoring/statistics for the workflow run ----- create some field, define a sensible data structure to easily collect/store/retreive logs

type Lifecycle

type Lifecycle struct {
	Prestop []string `json:"prestop"`
}

Lifecycle ..

type ListRunsJSON

type ListRunsJSON struct {
	RunIDs []string `json:"runIDs"`
}

type Log

type Log struct {
	Created        string                 `json:"created,omitempty"` // timezone???
	CreatedObj     time.Time              `json:"-"`
	LastUpdated    string                 `json:"lastUpdated,omitempty"` // timezone???
	LastUpdatedObj time.Time              `json:"-"`
	JobID          string                 `json:"jobID,omitempty"`
	JobName        string                 `json:"jobName,omitempty"`
	ContainerImage string                 `json:"containerImage,omitempty"`
	Status         string                 `json:"status"`
	Stats          *Stats                 `json:"stats"`
	Event          *EventLog              `json:"eventLog,omitempty"`
	Input          map[string]interface{} `json:"input"`
	Output         map[string]interface{} `json:"output"`
	Scatter        map[int]*Log           `json:"scatter,omitempty"`
}

Log stores the eventLog and runtime stats for a mariner component (i.e., engine or task) see: https://golang.org/pkg/time/

note: could log the job spec per task ----- or at least the task container spec ----- need to double check there is no sensitive info in the job spec ----- should be fine ----- for now: log container image pulled for task

type LogHandler

type LogHandler struct {
	// contains filtered or unexported fields
}

see Arborist's logging.go need to handle server logging

type MainLog

type MainLog struct {
	sync.RWMutex `json:"-"`
	Path         string           `json:"path"` // tentative  - maybe can't write this - path to log file to write/update
	Request      *WorkflowRequest `json:"request"`
	Main         *Log             `json:"main"`
	ByProcess    map[string]*Log  `json:"byProcess"`
}

MainLog is the interface for writing logs to workflowHistorydb

type MainLogJSON

type MainLogJSON struct {
	Path      string           `json:"path"` // tentative  - maybe can't write this - path to log file to write/update
	Request   *WorkflowRequest `json:"request"`
	Main      *Log             `json:"main"`
	ByProcess map[string]*Log  `json:"byProcess"`
}

MainLogJSON gets written to workflowHistorydb

type Manifest

type Manifest []ManifestEntry

type ManifestEntry

type ManifestEntry struct {
	GUID string `json:"object_id"`
}

type MarinerConfig

type MarinerConfig struct {
	Containers Containers `json:"containers"`
	Jobs       Jobs       `json:"jobs"`
	Secrets    Secrets    `json:"secrets"`
	Storage    Storage    `json:"storage"`
}

MarinerConfig ..

type RequestJSON

type RequestJSON struct {
	User    *UserJSON    `json:"user"`
	Request *AuthRequest `json:"request"`
}

type Resource

type Resource struct {
	CPU    string `json:"cpu"`
	Memory string `json:"memory"`
}

Resource ..

type ResourceRequirement

type ResourceRequirement struct {
	Min int64 `json:"min"`
	Max int64 `json:"max"`
}

ResourceRequirement is for logging resource requests vs. actual usage

type ResourceUsage

type ResourceUsage struct {
	Series         ResourceUsageSeries `json:"data"`
	SamplingPeriod int                 `json:"samplingPeriod"`
}

ResourceUsage ..

type ResourceUsageSamplePoint

type ResourceUsageSamplePoint struct {
	CPU    int64 `json:"cpu"`
	Memory int64 `json:"mem"`
}

ResourceUsageSamplePoint ..

type ResourceUsageSeries

type ResourceUsageSeries []ResourceUsageSamplePoint

ResourceUsageSeries ..

type Resources

type Resources struct {
	Limits   Resource `json:"limits"`
	Requests Resource `json:"requests"`
}

Resources ..

type RunIDJSON

type RunIDJSON struct {
	RunID string `json:"runID"`
}

type RunLogJSON

type RunLogJSON struct {
	Log *MainLog `json:"log"`
}

type S3Config

type S3Config struct {
	Name   string `json:"name"`
	Region string `json:"region"`
}

S3Config ..

type S3FileManager

type S3FileManager struct {
	AWSConfig     *aws.Config
	S3BucketName  string
	MaxConcurrent int
}

S3FileManager ..

type Secrets

type Secrets struct {
	AWSUserCreds *AWSUserCreds `json:"awsusercreds"`
}

Secrets ..

type SecurityContext

type SecurityContext struct {
	Privileged bool `json:"privileged"`
}

SecurityContext .. - run as user? run as group? should mariner have those settings?

type Server

type Server struct {
	S3FileManager *S3FileManager
	// contains filtered or unexported fields
}

type Stats

type Stats struct {
	CPUReq        ResourceRequirement `json:"cpuReq"` // in-progress
	MemoryReq     ResourceRequirement `json:"memReq"` // in-progress
	ResourceUsage ResourceUsage       `json:"resourceUsage"`
	Duration      float64             `json:"duration"`  // okay - currently measured in minutes
	DurationObj   time.Duration       `json:"-"`         // okay
	NFailures     int                 `json:"nfailures"` // TODO
	NRetries      int                 `json:"nretries"`  // TODO
}

Stats holds performance stats for a given process recorded for tasks as well as workflows Runtime for a workflow is the sum of runtime of that workflow's steps

type StatusJSON

type StatusJSON struct {
	Status string `json:"status"`
}

type Storage

type Storage struct {
	S3 S3Config `json:"s3"`
}

Storage ..

type Task

type Task struct {
	sync.RWMutex  `json:"-"`
	Parameters    cwl.Parameters         // input parameters of this task
	Root          *cwl.Root              // "root" of the "namespace" of the cwl file for this task
	Outputs       map[string]interface{} // output parameters of this task
	Scatter       []string               // if task is a step in a workflow and requires scatter; input parameters to scatter are stored here
	ScatterMethod string                 // if task is step in a workflow and requires scatter; scatter method specified - "dotproduct" or "flatcrossproduct" or ""
	ScatterTasks  map[int]*Task          // if task is a step in a workflow and requires scatter; scattered subtask objects stored here; scattered subtasks are enumerated
	ScatterIndex  int                    // if a task gets scattered, each subtask belonging to that task gets enumerated, and that index is stored here
	Children      map[string]*Task       // if task is a workflow; the Task objects of the workflow steps are stored here; {taskID: task} pairs
	OutputIDMap   map[string]string      // if task is a workflow; a map of {outputID: stepID} pairs in order to trace i/o dependencies between steps
	InputIDMap    map[string]string
	OriginalStep  *cwl.Step // if this task is a step in a workflow, this is the information from this task's step entry in the parent workflow's cwl file
	Done          *bool     // false until all output for this task has been collected, then true
	// --- New Fields ---
	Log           *Log           // contains Status, Stats, Event
	CleanupByStep *CleanupByStep // if task is a workflow; info for deleting intermediate files after they are no longer needed
}
 	a Task is a process is a node on the graph is one of [Workflow, CommandLineTool, ExpressionTool, ...]

	Task is a nested object
	A Task represents a process, which is a node on the graph (NOTE: maybe rename Task to Process, or something)
	So a Task is either a leaf in the graph (a Tool)
	or not a leaf in the graph (a workflow)
	If a Task is a workflow, then it has steps
	These steps have their own representations as Task objects
	The Task objects of the steps of a workflow are stored in the Children field of the workflow's Task object

	So for a workflow Task:
	task.Children is a map, where keys are the taskIDs and values are the Task objects of the workflow steps

type TaskRuntimeJSContext

type TaskRuntimeJSContext struct {
	Outdir string `json:"outdir"`
}

TaskRuntimeJSContext gets loaded into the js vm to allow in-line js expressions and parameter references in the CWL to be resolved see: https://www.commonwl.org/v1.0/CommandLineTool.html#Runtime_environment

NOTE: not currently supported: tmpdir, cores, ram, outdirSize, tmpdirSize

type TokenContext

type TokenContext struct {
	User TokenUser `json:"user"`
}

type TokenInfo

type TokenInfo struct {
	UserID string
}

type TokenPayload

type TokenPayload struct {
	Context TokenContext `json:"context"`
}

type TokenUser

type TokenUser struct {
	Name string `json:"name"`
}

type Tool

type Tool struct {
	JobName          string // if a k8s job (i.e., if a CommandLineTool)
	JobID            string // if a k8s job (i.e., if a CommandLineTool)
	WorkingDir       string
	Command          *exec.Cmd
	StepInputMap     map[string]*cwl.StepInput
	ExpressionResult map[string]interface{}
	Task             *Task
	S3Input          []*ToolS3Input

	// dev'ing
	// need to load this with runtime context as per CWL spec
	// https://www.commonwl.org/v1.0/CommandLineTool.html#Runtime_environment
	// for now, only populating 'runtime.outdir'
	JSVM     *otto.Otto
	InputsVM *otto.Otto
}

Tool represents a leaf in the graph of a workflow i.e., a Tool is either a CommandLineTool or an ExpressionTool If Tool is a CommandLineTool, then it gets run as a k8s job in its own container When a k8s job gets created, a pointer to that Tool gets pushed onto the k8s engine's stack of UnfinishedProcs the k8s engine continuously iterates through the stack of running procs, retrieving job status from k8s api as soon as a job is complete, the pointer to the Tool gets popped from the stack and a function is called to collect the output from that Tool's completed process

presently ExpressionTools run in a js vm in the mariner-engine, so they don't get dispatched as k8s jobs

type ToolS3Input

type ToolS3Input struct {
	URL         string `json:"url"`           // S3 URL
	Path        string `json:"path"`          // Local path for dl
	InitWorkDir bool   `json:"init_work_dir"` // is this an initwkdir requirement?
}

ToolS3Input ..

type UserJSON

type UserJSON struct {
	Token string `json:"token"`
}

type WorkflowRequest

type WorkflowRequest struct {
	Workflow json.RawMessage   `json:"workflow"`
	Input    json.RawMessage   `json:"input"`
	UserID   string            `json:"user"`
	Tags     map[string]string `json:"tags,omitempty"` // optional set of key:val pairs provided by user to annotate workflow run - NOTE: val is a string
	Manifest Manifest          `json:"manifest"`
	JobName  string            `json:"jobName,omitempty"` // populated internally by server

	// new: specify a service account for the workflow job
	ServiceAccountName string `json:"serviceAccountName,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL