Documentation ¶
Index ¶
- Constants
- func RandomFileNameInTempDir() string
- func TmpFile() string
- type Context
- func DRMAA2SessionManagerContext(sm drmaa2interface.SessionManager) *Context
- func ErrorTestContext() *Context
- func NewProcessContext() *Context
- func NewProcessContextByCfg(cfg ProcessConfig) *Context
- func NewProcessContextByCfgWithInitParams(cfg ProcessConfig, initParams simpletracker.SimpleTrackerInitParams) *Context
- func NewRemoteContext(cfg RemoteConfig, initParams *client.ClientTrackerParams) *Context
- func NewSingularityContext() *Context
- func NewSingularityContextByCfg(cfg SingularityConfig) *Context
- func (c *Context) Error() error
- func (c *Context) GetNextContextTaskID() int64
- func (c *Context) HasError() bool
- func (c *Context) OnError(f func(e error)) *Context
- func (c *Context) WithDefaultDockerImage(image string) *Context
- func (c *Context) WithDefaultJobTemplate(t drmaa2interface.JobTemplate) *Context
- func (c *Context) WithSessionName(jobSessionName string) *Context
- func (c *Context) WithUniqueSessionName() *Context
- type Iterator
- type Job
- func (j *Job) After(d time.Duration) *Job
- func (j *Job) AnyFailed() bool
- func (j *Job) Do(f func(job drmaa2interface.Job)) *Job
- func (j *Job) ErrorP(prompt string) string
- func (j *Job) Errored() bool
- func (j *Job) ExitStatus() int
- func (j *Job) ForAll(f func(drmaa2interface.Job, interface{}) error, params interface{})
- func (j *Job) ForEach(f func(drmaa2interface.Job, interface{}) error, params interface{}) error
- func (j *Job) HasAnyFailed() bool
- func (j *Job) JobID() string
- func (j *Job) JobInfo() drmaa2interface.JobInfo
- func (j *Job) JobInfos() []drmaa2interface.JobInfo
- func (j *Job) Kill() *Job
- func (j *Job) LastError() error
- func (j *Job) ListAll() []drmaa2interface.Job
- func (j *Job) ListAllFailed() []drmaa2interface.Job
- func (j *Job) Notify(n *Notifier) *Job
- func (j *Job) Observe(o Observer) *Job
- func (j *Job) OnError(f func(err error)) *Job
- func (j *Job) OnErrorPanic() *Job
- func (j *Job) OnFailure(f func(job drmaa2interface.Job)) *Job
- func (j *Job) OnFailureRun(cmd string, args ...string) *Job
- func (j *Job) OnFailureRunT(jt drmaa2interface.JobTemplate) *Job
- func (j *Job) OnSuccess(f func(job drmaa2interface.Job)) *Job
- func (j *Job) OnSuccessRun(cmd string, args ...string) *Job
- func (j *Job) OnSuccessRunT(jt drmaa2interface.JobTemplate) *Job
- func (j *Job) Output() string
- func (j *Job) OutputError() string
- func (j *Job) OutputP(prompt string) string
- func (j *Job) OutputsForJobIDs(jobIDs []string) map[string]string
- func (j *Job) ReapAll() *Job
- func (j *Job) Resubmit(r int) *Job
- func (j *Job) Resume() *Job
- func (j *Job) Retry(r int) *Job
- func (j *Job) RetryAnyFailed(amount int) *Job
- func (j *Job) Run(cmd string, args ...string) *Job
- func (j *Job) RunArray(begin, end, step, maxParallel int, cmd string, args ...string) *Job
- func (j *Job) RunArrayT(begin, end, step, maxParallel int, jt drmaa2interface.JobTemplate) *Job
- func (j *Job) RunEvery(d time.Duration, end time.Time, cmd string, args ...string) error
- func (j *Job) RunEveryT(d time.Duration, end time.Time, jt drmaa2interface.JobTemplate) error
- func (j *Job) RunMatrixT(jt drmaa2interface.JobTemplate, x, y Replacement) *Job
- func (j *Job) RunT(t drmaa2interface.JobTemplate) *Job
- func (j *Job) State() drmaa2interface.JobState
- func (j *Job) Success() bool
- func (j *Job) Suspend() *Job
- func (j *Job) Synchronize() *Job
- func (j *Job) Tag() string
- func (j *Job) TagWith(tag string) *Job
- func (j *Job) Template() *drmaa2interface.JobTemplate
- func (j *Job) Then(f func(job drmaa2interface.Job)) *Job
- func (j *Job) ThenRun(cmd string, args ...string) *Job
- func (j *Job) ThenRunArray(begin, end, step, maxParallel int, cmd string, args ...string) *Job
- func (j *Job) ThenRunT(jt drmaa2interface.JobTemplate) *Job
- func (j *Job) Wait() *Job
- func (j *Job) WaitWithTimeout(timeout time.Duration) *Job
- type JobTemplateField
- type Notifier
- type Observer
- type OpenAIConfig
- type ProcessConfig
- type RemoteConfig
- type Replacement
- type RunPBehavior
- type SessionManagerType
- type SingularityConfig
- type Template
- func (t *Template) AddIterator(name string, itr Iterator) *Template
- func (t *Template) AddMap(name string, f Iterator) *Template
- func (t *Template) MapTo(system string) drmaa2interface.JobTemplate
- func (t *Template) Next() drmaa2interface.JobTemplate
- func (t *Template) NextMap(name string) drmaa2interface.JobTemplate
- type TemplatePromptType
- type Workflow
- func (w *Workflow) Error() error
- func (w *Workflow) HasError() bool
- func (w *Workflow) ListJobs() []*Job
- func (w *Workflow) Logger() log.Logger
- func (w *Workflow) NewJob() *Job
- func (w *Workflow) OnError(f func(e error)) *Workflow
- func (w *Workflow) OnErrorPanic() *Workflow
- func (w *Workflow) Run(cmd string, args ...string) *Job
- func (w *Workflow) RunArrayJob(begin, end, step, maxParallel int, cmd string, args ...string) *Job
- func (w *Workflow) RunArrayJobT(begin, end, step, maxParallel int, jt drmaa2interface.JobTemplate) *Job
- func (w *Workflow) RunMatrixT(jt drmaa2interface.JobTemplate, x, y Replacement) *Job
- func (w *Workflow) RunT(jt drmaa2interface.JobTemplate) *Job
- func (w *Workflow) SetLogLevel(logLevel log.LogLevel) *Workflow
- func (w *Workflow) SetLogger(log log.Logger) *Workflow
- func (flow *Workflow) TemplateP(prompt string, templateType ...TemplatePromptType) (drmaa2interface.JobTemplate, error)
- func (w *Workflow) WithLLMOpenAI(config OpenAIConfig) *Workflow
Constants ¶
const PromptJobTemplateConstructorTemplate string = `` /* 1616-byte string literal not displayed */
const PromptTemplateDarwinShellScript string = `` /* 390-byte string literal not displayed */
const PromptTemplateErrorTransform string = `` /* 345-byte string literal not displayed */
const PromptTemplateLinuxShellScript string = `` /* 447-byte string literal not displayed */
const PromptTemplateOutputTransform string = `` /* 346-byte string literal not displayed */
const PromptTemplatePythonScript string = `` /* 253-byte string literal not displayed */
Variables ¶
This section is empty.
Functions ¶
func RandomFileNameInTempDir ¶ added in v1.2.13
func RandomFileNameInTempDir() string
RandomFileNameInTempDir returns a random file name in the temporary directory. The file name contains an replacement template {{.ID}} which gets replaced by a different number for each task. The file is not created.
Types ¶
type Context ¶
type Context struct { CtxCreationErr error SM drmaa2interface.SessionManager SMType SessionManagerType DefaultDockerImage string // DefaultTemplate contains all default settings for job submission // which are copied (if not set) to Run() or RunT() methods DefaultTemplate drmaa2interface.JobTemplate // ContextTaskID is a number which is incremented for each submitted // task. After incrementing and before submitting the task // all occurencies of the "{{.ID}}" string in the job template // are replaced by the current task ID. Following fields are // evaluated: OuputPath, ErrorPath. The workflow can be started // with an offset by setting the ContextTaskID to a value > 0. ContextTaskID int64 // Mutext is used for protecting the ContextTaskID sync.Mutex // JobSessionName is set to "wfl" by default. It can be changed // to a custom name. The name is used to create a DRMAA2 session. JobSessionName string }
Context contains a pointer to execution backend and configuration for it.
func DRMAA2SessionManagerContext ¶
func DRMAA2SessionManagerContext(sm drmaa2interface.SessionManager) *Context
DRMAA2SessionManagerContext creates a new Context using any given DRMAA2 Session manager (implementing the drmaa2interface).
func NewProcessContext ¶
func NewProcessContext() *Context
NewProcessContext returns a new *Context which manages processes.
func NewProcessContextByCfg ¶
func NewProcessContextByCfg(cfg ProcessConfig) *Context
NewProcessContextByCfg returns a new *Context which manages processes which is configured by the ProcessConfig.
func NewProcessContextByCfgWithInitParams ¶ added in v1.2.7
func NewProcessContextByCfgWithInitParams(cfg ProcessConfig, initParams simpletracker.SimpleTrackerInitParams) *Context
NewProcessContextByCfgWithInitParams returns a new *Context which manages processes which is configured by the ProcessConfig.
func NewRemoteContext ¶ added in v1.2.8
func NewRemoteContext(cfg RemoteConfig, initParams *client.ClientTrackerParams) *Context
NewRemoteContext creates a wfl Context for executing jobs through a remote connection. The details of the server must be provided in the initParams.
func NewSingularityContext ¶ added in v1.0.1
func NewSingularityContext() *Context
NewSingularityContext creates a new Context which allows to run the jobs in Singularity containers. It only works with JobTemplate based run methods (like RunT()) as it requires the JobCategory set to the the Singularity container image.
func NewSingularityContextByCfg ¶ added in v1.0.1
func NewSingularityContextByCfg(cfg SingularityConfig) *Context
NewSingularityContextByCfg creates a new Context which allows to run the jobs in Singularit containers. If the given SingularityConfig has set the DefaultImage to valid Singularity image then the Run() methods are using that container image. That image can be overridden by the RunT() method when setting the JobCategory.
func (*Context) GetNextContextTaskID ¶ added in v1.2.12
func (*Context) OnError ¶
OnError executes a function when an error occurred during context creation with the error as parameter.
func (*Context) WithDefaultDockerImage ¶ added in v1.2.12
func (*Context) WithDefaultJobTemplate ¶ added in v1.2.12
func (c *Context) WithDefaultJobTemplate(t drmaa2interface.JobTemplate) *Context
func (*Context) WithSessionName ¶ added in v1.2.12
WithSessionName set the JobSessionName in the context. The name is used to create a DRMAA2 session.
func (*Context) WithUniqueSessionName ¶ added in v1.3.0
WithUniqueSessionName creates a unique session name which is based on the current time and the process ID. Backends with persistent job storage (e.g. Docker) would otherwise mix up jobs from different application runs in the same flow if the same session name is used.
type Iterator ¶
type Iterator func(drmaa2interface.JobTemplate) drmaa2interface.JobTemplate
Iterator is a function which transforms a JobTemplate when called.
func NewEnvSequenceIterator ¶
NewEnvSequenceIterator returns an iterator which increments the environment variable env each time when called.
func NewTimeIterator ¶
NewTimeIterator returns a template iterator which return a job template every d time.
type Job ¶
Job defines methods for job life-cycle management. A job is always bound to a workflow which defines the context and job session (logical separation of jobs) of the underlying backend. The Job object allows to create an manage tasks.
func (*Job) Do ¶
func (j *Job) Do(f func(job drmaa2interface.Job)) *Job
Do executes a function which gets the DRMAA2 job object as parameter. This allows working with the low-level DRMAA2 job object. In case of an array job submit the function is called on each job in the job array.
func (*Job) ErrorP ¶ added in v1.3.1
ErrorP applies a prompt to the last error message. The prompt is a textual description of the transformation which is applied to the error message. The transformation is done by using the OpenAI API. Note, that the output size is limited. If the output is too large the output might not be useful.
Examples for prompts: - "What is the reason for the error?" - "Explain the error and provide a solution" - "Translate the error message into Bayerisch (kind of German)"
func (*Job) ExitStatus ¶
ExitStatus waits until the previously submitted task is finished and returns the exit status of the task. In case of an internal error it returns -1.
func (*Job) ForAll ¶ added in v1.2.9
func (j *Job) ForAll(f func(drmaa2interface.Job, interface{}) error, params interface{})
ForAll executes a user defined function for each task of the job. The function has an interface as input parameter which can be used to pass additional data into or out of the function as a result (like a pointer to a struct or pointer to an output slice).
Unlike ForEach ForAll processes all tasks of the job/flow in parallel in goroutines. After starting the functions in parallel it waits until all goroutines are finished.
It is up to the caller to ensure that the function is thread safe and that the data type of the second argument is thread safe or protected by a mutex.
If you are unsure please use ForEach instead. For an example see the documentation of ForEach.
func (*Job) ForEach ¶ added in v1.3.0
func (j *Job) ForEach(f func(drmaa2interface.Job, interface{}) error, params interface{}) error
ForEach executes a user defined function for each task of the job. The function has an interface as input parameter which can be used to pass additional data into or out of the function as a result (like a pointer to a struct or pointer to an output slice). The second argument of _ForEach_ must be a pointer to the data type or nil. The the iteration stops when all reachable tasks are processed or the user defined function returns an error for one task.
ForEach processes all tasks of the job/flow iteratively. ForAll processes all tasks of the job/flow in parallel and waits until all tasks are finished.
Example:
getJobIDs := func(j drmaa2interface.Job, i interface{}) error { jobIDs := i.(*[]string) // i is a pointer to a slice of strings here *jobIDs = append(*jobIDs, j.GetID()) return nil } var jobIDs []string // slice of strings which is passed to the function flow.ForEach(getJobIDs, &jobIDs) // jobIDs now contains all job IDs of the flow
func (*Job) HasAnyFailed ¶
HasAnyFailed returns true if there is any failed task in the chain. Note that the functions implicitly waits until all tasks finsihed.
func (*Job) JobInfo ¶
func (j *Job) JobInfo() drmaa2interface.JobInfo
JobInfo returns information about the last task/job. Which values are actually set depends on the DRMAA2 implementation of the backend specified in the context. TODO job array support
func (*Job) JobInfos ¶
func (j *Job) JobInfos() []drmaa2interface.JobInfo
JobInfos returns all JobInfo objects of all tasks/job run in the workflow. JobInfo contains run-time details of the jobs. The availability of the values depends on the underlying DRMAA2 implementation of the execution Context. TODO job array support
func (*Job) LastError ¶
LastError returns the error if occurred during last job operation. Don't use LastError() to find the reason why a job was failing! Check exit code / stderr output etc.
func (*Job) ListAll ¶ added in v1.2.9
func (j *Job) ListAll() []drmaa2interface.Job
ListAll returns all tasks as slice of DRMAA2 jobs. If there is no task the function returns an empty slice.
func (*Job) ListAllFailed ¶
func (j *Job) ListAllFailed() []drmaa2interface.Job
ListAllFailed returns all tasks which failed as array of DRMAA2 jobs. Note that it implicitly blocks and waits until all tasks are finished.
func (*Job) Observe ¶
Observe executes the functions defined in the Observer when task submission errors, the task failed, and when the job finished successfully. Note that this is a blocking call.
func (*Job) OnError ¶
OnError executes the given function if the last Job operation resulted in an error (like a job submission failure).
func (*Job) OnErrorPanic ¶ added in v1.3.1
OnErrorPanic panics and prints the error if the last Job operation resulted in an error. Otherwise the job is returned.
func (*Job) OnFailure ¶
func (j *Job) OnFailure(f func(job drmaa2interface.Job)) *Job
OnFailure executes the given function when the previous task in the list failed. Fails mean the job was started successfully by the system but then existed with an exit code != 0.
When running the task resulted in an error (i.e. the job run function errored), then the function is not executed.
func (*Job) OnFailureRun ¶
OnFailureRun submits a task when the previous task ended in a state different than drmaa2interface.Done.
func (*Job) OnFailureRunT ¶
func (j *Job) OnFailureRunT(jt drmaa2interface.JobTemplate) *Job
OnFailureRunT submits a task when the previous job ended in a state different than drmaa2interface.Done.
func (*Job) OnSuccess ¶
func (j *Job) OnSuccess(f func(job drmaa2interface.Job)) *Job
OnSuccess executes the given function after the previously submitted task finished in the drmaa2interface.Done state.
func (*Job) OnSuccessRun ¶
OnSuccessRun submits a task when the previous task ended in the state drmaa2interface.Done.
func (*Job) OnSuccessRunT ¶
func (j *Job) OnSuccessRunT(jt drmaa2interface.JobTemplate) *Job
OnSuccessRunT submits a task when the previous task ended in the state drmaa2interface.Done.
func (*Job) Output ¶ added in v1.2.12
Output returns the output of the last task if it is in an end state. In case of OS process backend the output is read from the file specified in the JobTemplate.OutputPath. If it is not set, the output is printed to stdout and cannot be retrieved. When having multiple tasks, the output path of each task must be set to a different file otherwise the output will be overwritten. This can be achieved by having the {{.ID}} placeholder in the output path (check: OutputPath: wfl.RandomFileNameInTempDir())
Currently only supported for the default OS session manager, Docker session manager and Kubernetes session manager.
func (*Job) OutputError ¶ added in v1.3.1
func (*Job) OutputP ¶ added in v1.3.1
OutputP applies the given prompt to the output of the previous job if there is any. The prompt is a textual description of the transformation which is applied to the output. The transformation is done by using the OpenAI API. Note, that the context size is limited. If the output is too large the output might not be useful.
Examples for prompts: - "Translate the output in Schwäbisch (kind of German)" - "Create a summary of the output with max. 30 words" - use some output specifc questions...
func (*Job) OutputsForJobIDs ¶ added in v1.3.0
JobIDs returns a map of each job ID and their output if jobIDs is nil. Otherwise only the output for the given job IDs is returned.
Only supported for the default session manager, docker session manager and kubernetes session manager.
func (*Job) ReapAll ¶ added in v1.2.3
ReapAll removes all job resources from the workload manager. It calls the DRMAA2 Reap() method for all tasks. The behavior is backend specific. After the ReapAll() call the job object should not be used anymore. Reap() must be called only when all tasks are in a terminated state.
func (*Job) Resubmit ¶
Resubmit starts the previously submitted task n-times. All tasks are executed in parallel.
func (*Job) Retry ¶
Retry waits until the last task in chain (not for the previous ones) is finished. When it failed it resubmits it and waits again for a successful end.
func (*Job) RetryAnyFailed ¶
RetryAnyFailed reruns any failed tasks and replaces them with a new task incarnation.
func (*Job) Run ¶
Run submits a task which executes the given command and args. The command needs to be available on the execution backend.
func (*Job) RunArray ¶ added in v1.2.1
RunArray executes the given command multiple times. If begin is set to 1 end to 10, and step to 1, it executes the command 10 times. Each job run gets a different internal array job task ID environment variable set which depends on the backend. The maxParallel parameter is respected only by some backends. It restricts the parallel execution to that amount of commands at any given time. If set to 1 it forces sequential execution. If not required it should be set to the total amount of tasks specified.
func (*Job) RunArrayT ¶ added in v1.2.6
func (j *Job) RunArrayT(begin, end, step, maxParallel int, jt drmaa2interface.JobTemplate) *Job
RunArrayT executes the job defined in a JobTemplate multiple times. See also RunArray().
func (*Job) RunEvery ¶
RunEvery provides the same functionally like RunEveryT but the job is created based on the given command with the arguments.
func (*Job) RunEveryT ¶
func (j *Job) RunEveryT(d time.Duration, end time.Time, jt drmaa2interface.JobTemplate) error
RunEveryT submits a job every d time.Duration regardless if the previously job is still running or finished or failed. The method only aborts and returns an error if an error during job submission happened and the job could not be submitted.
func (*Job) RunMatrixT ¶ added in v1.2.9
func (j *Job) RunMatrixT(jt drmaa2interface.JobTemplate, x, y Replacement) *Job
RunMatrixT executes the job defined in a JobTemplate exactly len(x.Replacement)*len(y.Replacement) times. It generates a new job template for each combination of replacements executed from x and y on the given JobTemplate.
Example: Submit two different commands (sleep 1 and sleep 2) in two different container images, having 4 jobs in total submitted.
j.RunMatrixT(drmaa2interface.JobTemplate{ JobCategory: "{{image}}", RemoteCommand: "sleep", Args: []string{"{{arg}}", }, wfl.Replacement{ Fields: []string{"JobCategory"}, Pattern: "{{image}}", Replacements: []string{"busybox:latest", "golang:latest"}, }, wfl.Replacement{ Fields: []string{"Args"}, Pattern: "{{arg}}", Replacements: []string{"1", "2"}, }).WaitAll()
func (*Job) RunT ¶
func (j *Job) RunT(t drmaa2interface.JobTemplate) *Job
RunT submits a task given specified with the JobTemplate.
func (*Job) State ¶
func (j *Job) State() drmaa2interface.JobState
State returns the current state of the job previously submitted.
func (*Job) Success ¶
Success returns true in case the current task stated equals drmaa2interface.Done and the job exit status is 0.
func (*Job) Suspend ¶
Suspend stops the last task of the job from execution. How this is done depends on the Context. Typically a signal (like SIGTSTP) is sent to the tasks of the job.
func (*Job) Synchronize ¶
Synchronize waits until the tasks of the job are finished. All jobs are terminated when the call returns.
func (*Job) TagWith ¶
TagWith tags a job with a string for identification. Global for all tasks of the job.
func (*Job) Template ¶
func (j *Job) Template() *drmaa2interface.JobTemplate
Template returns the JobTemplate of the previous job submission.
func (*Job) Then ¶
func (j *Job) Then(f func(job drmaa2interface.Job)) *Job
Then waits until the previous task is terminated and executes the given function by providing the DRMAA2 job interface which gives access to the low-level DRMAA2 job methods.
func (*Job) ThenRun ¶
ThenRun waits until the previous task is terminated and then executes the given command as new task.
func (*Job) ThenRunArray ¶ added in v1.2.1
ThenRunArray waits until the previous task is terminated and then executes a new task based on the given JobTemplate.
func (*Job) ThenRunT ¶
func (j *Job) ThenRunT(jt drmaa2interface.JobTemplate) *Job
ThenRunT waits until the previous task is terminated and then executes a new task based on the given JobTemplate.
func (*Job) Wait ¶
Wait until the most recent task is finished. In case of a job array it waits for all tasks of the array.
func (*Job) WaitWithTimeout ¶ added in v1.3.2
WaitWithTimeout waits until the most recent task is finished. In case of a job array it waits for all tasks of the array. It returns either when the task is finished or the timeout is reached. In case of an timeout an error is set which can be retrieved with LastError().
type JobTemplateField ¶ added in v1.2.9
type JobTemplateField string
const ( RemoteCommand JobTemplateField = "RemoteCommand" Args JobTemplateField = "Args" SubmitAsHold JobTemplateField = "SubmitAsHold" ReRunnable JobTemplateField = "ReRunnable" JobEnvironment JobTemplateField = "JobEnvironment" WorkingDirectory JobTemplateField = "WorkingDirectory" JobCategory JobTemplateField = "JobCategory" Email JobTemplateField = "Email" EmailOnStarted JobTemplateField = "EmailOnStarted" EmailOnTerminated JobTemplateField = "EmailOnTerminated" JobName JobTemplateField = "JobName" InputPath JobTemplateField = "InputPath" OutputPath JobTemplateField = "OutputPath" ErrorPath JobTemplateField = "ErrorPath" JoinFiles JobTemplateField = "JoinFiles" ReservationID JobTemplateField = "ReservationID" QueueName JobTemplateField = "QueueName" MinSlots JobTemplateField = "MinSlots" MaxSlots JobTemplateField = "MaxSlots" Priority JobTemplateField = "Priority" CandidateMachines JobTemplateField = "CandidateMachines" MinPhysMemory JobTemplateField = "MinPhysMemory" MachineOS JobTemplateField = "MachineOS" MachineArch JobTemplateField = "MachineArch" StartTime JobTemplateField = "StartTime" DeadlineTime JobTemplateField = "DeadlineTime" StageInFiles JobTemplateField = "StageInFiles" StageOutFiles JobTemplateField = "StageOutFiles" ResourceLimits JobTemplateField = "ResourceLimits" AccountingID JobTemplateField = "AccountingID" )
type Notifier ¶
type Notifier struct {
// contains filtered or unexported fields
}
func NewNotifier ¶
func NewNotifier() *Notifier
NewNotifier creates a job notifier which allows to synchronize multiple job workflows executed concurrently in go functions. Note that there is an internal buffer of 1024 jobs which causes SendJob() to block if the buffer is full.
func (*Notifier) Destroy ¶
func (n *Notifier) Destroy()
Destroy closes the job channel inside the notfier.
func (*Notifier) ReceiveJob ¶
ReceiveJob returns a job sent to the notifier.
type Observer ¶
type Observer struct { ErrorHandler func(error) FailedHandler func(drmaa2interface.Job) SuccessHandler func(drmaa2interface.Job) }
Observer is a collection of functions which implements behavior which should be executed when a task submission failed, when the task failed, or when then the job was running successfully.
func NewDefaultObserver ¶
func NewDefaultObserver() Observer
NewDefaultObserver returns an Observer which panics when a task submission error occurred, prints a message and exits the application when the task exits with error code != 0, and prints a message and continues when a task was running successfully.
type OpenAIConfig ¶ added in v1.3.1
type OpenAIConfig struct { // Token is the access token to OpenAI API (to create one: // https://platform.openai.com/account/api-keys) Token string `json:"token"` // RunPMethod defines how the RunP() method should be executed. Currently // there is a MacOS shell and Linux bash variant. The Linux shell variant // is the default. RunPMethod RunPBehavior `json:"runPBehavior"` // Model allows you to change from gpt-3.5-turbo-0301 to a larger model. // Like one with 32k tokens for processing larger job outputs. Model string `json:"model"` }
OpenAIConfig is used to configure the OpenAI API client to enable the xP() methods which expects a prompt.
type ProcessConfig ¶
type ProcessConfig struct { // DBFile is the local file which contains the internal state DB. DBFile string // DefaultTemplate contains the default job submission settings if // not overridden by the RunT() like methods. DefaultTemplate drmaa2interface.JobTemplate // PersistentJobStorage keeps job state on disk. This slows down // job submission but prevents waiting forever for processes which // disappeared PersistentJobStorage bool // JobDBFile is used when PersistentJobStorage is set to true. It must // be different from DBFile. JobDBFile string }
ProcessConfig contains the configuration for the process context.
type RemoteConfig ¶ added in v1.2.8
type RemoteConfig struct { LocalDBFile string // job session DB file // DefaultTemplate contains the default job submission settings if // not overridden by the RunT() like methods. DefaultTemplate drmaa2interface.JobTemplate }
type Replacement ¶ added in v1.2.9
type Replacement struct { // Fields are JobTemplate field names which are evaluated for // the pattern to get replaced. Special fields are: // - allStrings - all fields which are strings, string slices, // or string maps are going to be searched for the pattern // which is then replaced by one of the replacements. Fields []JobTemplateField // Pattern defines a string in the job template which is going to be // replaced by the value of the replacement string. Pattern string // Replacements defines all values the Pattern is going to be replaced // in the job template. For each replacement a new job template is // created and submitted. Replacements []string }
Replacement defines the fields and the values to be replaced in the workflow JobTemplate. The len(Replacement) defines how many job templates are generated for this replacement instruction.
type RunPBehavior ¶ added in v1.3.1
type RunPBehavior string
const ( RunPBehaviorLinuxShellScript RunPBehavior = "linuxshellscript" RunPBehaviorMacOSShellScript RunPBehavior = "darwinshellscript" )
type SessionManagerType ¶ added in v1.2.12
type SessionManagerType int
const ( // DefaultSessionManager handles jobs as processes DefaultSessionManager SessionManagerType = iota // DockerSessionManager manages Docker containers DockerSessionManager // CloudFoundrySessionManager manages Cloud Foundry application tasks CloudFoundrySessionManager // KubernetesSessionManager creates Kubernetes jobs KubernetesSessionManager // SingularitySessionManager manages Singularity containers SingularitySessionManager // SlurmSessionManager manages slurm jobs as cli commands SlurmSessionManager // LibDRMAASessionManager manages jobs through libdrmaa.so LibDRMAASessionManager // PodmanSessionManager manages jobs as podman containers either locally or remote PodmanSessionManager // RemoteSessionManager manages jobs over the network through a remote server RemoteSessionManager // ExternalSessionManager can be used by external JobTracker implementations // during development time before they get added here ExternalSessionManager // GoogleBatchSessionManager manages Google Cloud Batch jobs GoogleBatchSessionManager // MPIOperatorSessionManager manages jobs as MPI operator jobs on Kubernetes MPIOperatorSessionManager )
type SingularityConfig ¶ added in v1.0.1
type SingularityConfig struct { DefaultImage string DBFile string DefaultTemplate drmaa2interface.JobTemplate }
SingularityConfig contains the default settings for the Singularity containers.
type Template ¶
type Template struct { Jt drmaa2interface.JobTemplate // contains filtered or unexported fields }
Template is a higher level job template for simplifying creating dynamically JobTemplates.
func NewTemplate ¶
func NewTemplate(jt drmaa2interface.JobTemplate) *Template
NewTemplate creates a Template out of a drmaa2interface.JobTemplate
func (*Template) AddIterator ¶
AddIterator registers an interation function which transforms the internal JobTemplate into another JobTemplate. The function is called each time when Next() is called. Multiple Iterators can be registered. The execution order or the Iterators is undefined and does not depend on the registration order.
func (*Template) AddMap ¶
AddMap registers a mapping function (same as Iterator) which converts the underlying DRMAA2 JobTemplate into a specific form. In difference to the iterator functions it does not make any persistent changes to the job template. Its intention is to cover the differencens required in the job template so that a job can run on different backends.
func (*Template) MapTo ¶
func (t *Template) MapTo(system string) drmaa2interface.JobTemplate
MapTo transforms the JobTemplate and returns it. It does not make changes to the underlying Template.
func (*Template) Next ¶
func (t *Template) Next() drmaa2interface.JobTemplate
Next applies all registered Iterators to the internal job template and returns the next version of the job template.
func (*Template) NextMap ¶
func (t *Template) NextMap(name string) drmaa2interface.JobTemplate
NextMap applies all registered Iterators to the internal job template and finally does a temporary mapping of the job template with the mapping function specified.
type TemplatePromptType ¶ added in v1.3.1
type TemplatePromptType int
const ( // PromptTemplateLinuxShellScript is the default template for creating // a Linux shell script. TemplatePromptTypeLinuxShellScript TemplatePromptType = iota // PromptTemplateDarwinShellScript is the template for creating a macOS // shell script. TemplatePromptTypeDarwinShellScript // PromptTemplatePythonScript is the template for creating a Python // script. TemplatePromptTypePythonScript )
type Workflow ¶
type Workflow struct {
// contains filtered or unexported fields
}
Workflow contains the backend context and a job session. The DRMAA2 job session provides typically logical isolation between jobs.
func NewWorkflow ¶
NewWorkflow creates a new Workflow based on the given execution context. Internally it creates a DRMAA2 JobSession which is used for separating jobs.
func (*Workflow) Error ¶
Error returns the error if happened during creating a job session or opening a job session.
func (*Workflow) HasError ¶
HasError returns true if there was an error during creating a job session or opening a job session.
func (*Workflow) ListJobs ¶ added in v1.2.4
ListJobs returns all jobs visible in the workflow (i.e. available in the underlying drmaa2session). It may wrap one task in one Job object and return multiple Job objects even when only one Job with many tasks was submitted.
func (*Workflow) NewJob ¶ added in v1.2.9
NewJob creates a new empty Job object for the given workflow. Equivalent to NewJob(*Workflow).
func (*Workflow) OnError ¶
OnError executes a function if happened during creating a job session or opening a job session.
func (*Workflow) OnErrorPanic ¶ added in v1.3.1
OnErrorPanic panics if happened during creating a job session or opening a job session.
func (*Workflow) Run ¶
Run submits the first task in the workflow and returns the Job object. Same as NewJob(w).Run().
func (*Workflow) RunArrayJob ¶ added in v1.2.1
RunArrayJob executes the given command multiple times as specified with begin, end, and step. To run a command 10 times, begin can be set to 1, end to 10 and step to 1. maxParallel can limit the amount of executions which are running in parallel if supported by the context. The process context sets the TASK_ID env variable to the task ID.
func (*Workflow) RunArrayJobT ¶ added in v1.2.6
func (w *Workflow) RunArrayJobT(begin, end, step, maxParallel int, jt drmaa2interface.JobTemplate) *Job
RunArrayJob executes the given job defined in the JobTemplate multiple times. See RunArrayJob().
func (*Workflow) RunMatrixT ¶ added in v1.2.9
func (w *Workflow) RunMatrixT(jt drmaa2interface.JobTemplate, x, y Replacement) *Job
func (*Workflow) RunT ¶
func (w *Workflow) RunT(jt drmaa2interface.JobTemplate) *Job
RunT submits the first task in the workflow and returns the Job object. Same as NewJob(w).RunT().
func (*Workflow) SetLogLevel ¶ added in v1.2.9
SetLogLevel changes the log level filter for the workflow. The default log level is log.Warning.
func (*Workflow) SetLogger ¶ added in v1.0.1
SetLogger sets a new logger for the workflow which writes processes internal log messages. Note that nil loggers are not accepted.
Example: w.SetLogger(log.NewKlogLogger("INFO"))
func (*Workflow) TemplateP ¶ added in v1.3.1
func (flow *Workflow) TemplateP(prompt string, templateType ...TemplatePromptType) (drmaa2interface.JobTemplate, error)
TemplateP creates a job template based on the description of the prompt. It creates a job template which executes the prompt as a shell script. Before executing the job template it MUST be checked for its safety!!! The template parameter is optional and can be used to define a template for the shell script. The default template is for generating a Linux shell script (alternative for macOS: wfl.PromptTemplateDarwinShellScript).
func (*Workflow) WithLLMOpenAI ¶ added in v1.3.1
func (w *Workflow) WithLLMOpenAI(config OpenAIConfig) *Workflow
WithLLMOpenAI adds an OpenAI API client to the workflow. This enables the OutputP(), and ErrorP() methods defined on the Job struct and the TemplateP() method defined on the Workflow struct.
TemplateP("prompt") creates a job template based on the description of the prompt. Before executing the job template it should be checked for its safety! This is for research (or fun)! PLEASE EXECUTE THE JOB TEMPLATES ONLY WITH CARE IN AN ISOLATED ENVIRONMENT! THIS CAN BE DANGEROUS!
OutputP("What to do with the output") can be used to process the job output (internally retrieved by Output()) by using a textual description of the transformation applied to the output.
Note, that the Output() must be available, i.e. for process workflows the stdout of the process must be written to a unique persistent file (OutputPath). Please check the Ouput() documentation for more details.
ErrorP() transforms the job error (retrieved by Error()) by using a textual description of the transformation task. For example you can let explain the error and an possible solution through the LLM.
Note that this is an experimental feature! It might be dropped or reworked in the future!