Documentation
¶
Index ¶
- func RegisterTaskdef(name string, f NewTaskFunc)
- type DatastoreTaskable
- type NewTaskFunc
- type Progress
- type Task
- func (t Task) DatastoreType() string
- func (t *Task) Delete(store datastore.Datastore) error
- func (task *Task) Do(store datastore.Datastore, tc chan *Task) error
- func (task *Task) Enqueue(store datastore.Datastore, amqpurl string) error
- func (t Task) GetId() string
- func (t Task) Key() datastore.Key
- func (t *Task) NewSQLModel(key datastore.Key) sql_datastore.Model
- func (t *Task) PubSubChannelName() string
- func (t *Task) QueueMsg() (amqp.Publishing, error)
- func (t *Task) Read(store datastore.Datastore) error
- func (t *Task) SQLParams(cmd sql_datastore.Cmd) []interface{}
- func (t *Task) SQLQuery(cmd sql_datastore.Cmd) string
- func (t *Task) Save(store datastore.Datastore) (err error)
- func (t *Task) StatusString() string
- func (t *Task) UnmarshalSQL(row sqlutil.Scannable) error
- type TaskRequests
- type Taskable
- type TasksEnqueueParams
- type TasksGetParams
- type TasksListParams
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RegisterTaskdef ¶
func RegisterTaskdef(name string, f NewTaskFunc)
RegisterTaskdef registers a task type, must be called before a task can be used.
Types ¶
type DatastoreTaskable ¶
SqlDbTaskable is a task that has a method for assigning a datastore to the task. If your task needs access to a datastore, implement DatastoreTaskable, task-orchestrators will detect this method and call it to set the datastore before calling Taskable.Do
type NewTaskFunc ¶
type NewTaskFunc func() Taskable
NewTaskFunc is a function that creates new task instances task-orchestrators use NewTaskFunc to create new Tasks, and then attempt to json.Unmarshal params into the task definition
type Progress ¶
type Progress struct {
Percent float32 `json:"percent"` // percent complete between 0.0 & 1.0
Step int `json:"step"` // current Step
Steps int `json:"steps"` // number of Steps in the task
Status string `json:"status"` // status string that describes what is currently happening
Done bool `json:"done"` // complete flag
Dest string `json:"dest"` // place for sending users, could be a url, could be a relative path
Error error `json:"error,omitempty"` // error message
}
Progress represents the current state of a task tasks will be given a Progress channel to send updates
type Task ¶
type Task struct {
// uuid identifier for task
Id string `json:"id"`
// created date rounded to secounds
Created time.Time `json:"created"`
// updated date rounded to secounds
Updated time.Time `json:"updated"`
// human-readable title for the task, meant to be descriptive & varied
Title string `json:"title"`
// id of user that submitted this task
UserId string `json:"userId"`
// Type of task to be executed
Type string `json:"type"`
// parameters supplied to the task, should be json bytes
Params map[string]interface{} `json:"params"`
// Status Message
Status string `json:"status,omitempty"`
// Error Message
Error string `json:"error,omitempty"`
// timstamp for when request was added to the tasks queue
// nil if request hasn't been sent to the queue
Enqueued *time.Time `json:"enqueued,omitempty"`
// timestamp for when the task was removed from the queue
// and started, nil if the request hasn't been started
Started *time.Time `json:"started,omitempty"`
// timestamp for when request succeeded
// nil if task hasn't succeeded
Succeeded *time.Time `json:"succeeded,omitempty"`
// timestamp for when request failed
// nil if task hasn't failed
Failed *time.Time `json:"failed,omitempty"`
// progress of this task's completion
// progress may not be stored, but instead kept ephemerally
Progress *Progress `json:"progress,omitempty"`
}
Task represents the storable state of a task. Note this is not the "task" itself (the function that will be called to do the actual work associated with a task) but the state associated with performing a task. Task holds the type of work to be done, parameters to configure the work to be done, and the status of that work. different types of "work" are done by implementing the Taskable interface specified in taskdef.go lots of the methods on Task overlap with Taskable, this is on purpose, as Task wraps phases of task completion to track the state of a task
func TaskFromDelivery ¶
TaskFromDelivery reads a task from store based on an amqp.Delivery message
func (Task) DatastoreType ¶
DatastoreType is to fulfill the sql_datastore.Model interface It distinguishes "Task" as a storable type. "Task" is not (yet) intended for use outside of Datatogether servers.
func (*Task) Enqueue ¶
Enqueue adds a task to the queue located at ampqurl, writing creates/updates for the task to the given store
func (*Task) NewSQLModel ¶
func (t *Task) NewSQLModel(key datastore.Key) sql_datastore.Model
func (*Task) PubSubChannelName ¶
func (*Task) QueueMsg ¶
func (t *Task) QueueMsg() (amqp.Publishing, error)
QueueMsg formats the task as an amqp.Publishing message for adding to a queue
func (*Task) SQLParams ¶
func (t *Task) SQLParams(cmd sql_datastore.Cmd) []interface{}
func (*Task) StatusString ¶
StatusString returns a string representation of the status of a task based on the state of it's date stamps
type TaskRequests ¶
type TaskRequests struct {
// url to amqp server for enqueuing tasks, only required
// to fullfill requests, not submit them
AmqpUrl string
// Store to read / write tasks to only required
// to fulfill requests, not submit them
Store datastore.Datastore
}
TaskRequests encapsulates all types of requests that can be made in relation to tasks, to be made available for RPC calls. TODO - should this internal state be moved into the package level via package-level setter funcs?
func (TaskRequests) Enqueue ¶
func (r TaskRequests) Enqueue(params *TasksEnqueueParams, task *Task) (err error)
Add a task to the queue for completion
func (TaskRequests) Get ¶
func (t TaskRequests) Get(args *TasksGetParams, res *Task) (err error)
func (TaskRequests) List ¶
func (t TaskRequests) List(args *TasksListParams, res *[]*Task) (err error)
type Taskable ¶
type Taskable interface {
// are these task params valid? return error if not
// this func will be called before adding the task to
// the queue, and won't be added on failure.
Valid() error
// Do the task, returning incremental progress updates
// it's expected that the func will send either
// p.Done == true or p.Error != nil at least once
// to signal that the task is either done or errored
Do(updates chan Progress)
}
Taskable anything that fits on a task queue, it is a type of "work" that can be performed. Lots of things
func NewTaskable ¶
NewTaskable generates a new Taskable instance from the registered types
type TasksEnqueueParams ¶
type TasksEnqueueParams struct {
// Title of the task
// Requesters should generate their own task title for now
// tasks currently have no way of generating a sensible default title
Title string
// Type of task to perform
Type string
// User that initiated the request
UserId string
// Parameters to feed to the task
Params map[string]interface{}
}
TasksEnqueueParams are for enqueing a task.
type TasksGetParams ¶
type TasksGetParams struct {
Id string
}
Get a single Task, currently only lookup by ID is supported