Documentation ¶
Index ¶
- Constants
- func Add(values ...interface{}) (int, error)
- func FormatCurrentTime(fmt string) string
- func FormatTime(fmt string, t interface{}) (string, error)
- func GetOutboundIP() (net.IP, error)
- func LoadNamespace(source DataSource, namespaces []string) (*template.Template, error)
- func RunNow(ctx context.Context, p *Pipeline)
- func RunPipelineForever(ctx context.Context, p *Pipeline, d time.Duration)
- func SendTo(s Sink, key, value string) error
- func TemplateError(format string, args ...interface{}) (string, error)
- func Timestamp() int64
- func ToInt(i interface{}) (int, error)
- func ToJSON(src []byte) (v interface{}, err error)
- func ToString(x interface{}) string
- type Client
- type DataSource
- type FileSystem
- type HTTPTask
- type InputTask
- type Interval
- type JSONPipe
- type JSONValidationTask
- type Link
- type LoadTask
- type MQTTClient
- type MQTTTask
- type MemoryStore
- type Pipe
- type PipeFunc
- type Pipeline
- type PipelineConfig
- type PipelineContext
- type PipelineTask
- type Plumber
- func (plumber Plumber) NewPipeline(conf *PipelineConfig) (*Pipeline, error)
- func (plumber Plumber) SetClient(taskType string, client Client)
- func (plumber Plumber) SetSink(taskType string, sink Sink)
- func (plumber Plumber) SetSource(taskType string, source DataSource)
- func (plumber Plumber) SetTemplateSource(taskType string, source DataSource)
- type Settings
- type SimpleJSONPipe
- type Sink
- type State
- type Status
- type StoreTask
- type Task
- type TemplatePipe
- type TemplateTask
- type Trigger
Constants ¶
const ( Waiting = State(iota) // not running/failed Running Success Failed // failed in an unrecoverable way, or exceeded retries Retrying // failed, but might be able to succeed after retry )
Variables ¶
This section is empty.
Functions ¶
func FormatCurrentTime ¶
FormatCurrentTime returns the current UTC time formatted by the given string.
func FormatTime ¶
FormatTime formats the given time according to the given format.
The time must either be a go time.Time, or an int, which is interpreted as the number of milliseconds since Jan 1, 1970, in UTC. Other types will return an error.
func GetOutboundIP ¶
func LoadNamespace ¶
func LoadNamespace(source DataSource, namespaces []string) (*template.Template, error)
LoadNamespace loads a template namespace from a given source.
If the source is a FileSystem and id does not have an extension, .gotmpl is automatically appended to the id.
func RunPipelineForever ¶
RunPipelineForever repeatedly schedules the Pipeline's execution until the context is canceled.
The Pipeline will be executed once immediately. The given duration is waited after an execution completed; it is _NOT_ the amount of time between starts, but instead the time from one end to the next start.
If the duration is zero or negative, the Pipeline will execute as often as possible.
func SendTo ¶
SendTo is a convenience wrapper to send a string value to a Sink without managing the context.
func TemplateError ¶
func Timestamp ¶
func Timestamp() int64
Timestamp returns the number of milliseconds since Jan 1, 1970 UTC.
Types ¶
type Client ¶
A Client generates a Pipe from a Task definition.
func NewSinkClient ¶
NewSinkClient returns a Client to store data in a given Sink.
func NewSourceClient ¶
func NewSourceClient(source DataSource) Client
NewSourceClient returns a Client to load data from the DataSource.
func NewTaskType ¶
NewTaskType returns a Client that can be used directly as new types.
The resulting generator generates Pipes that, when executed, execute the underlying Pipeline, making it possible to create complex new task types simply by composing existing pipeline definitions.
func NewTemplateClient ¶
func NewTemplateClient(src DataSource) Client
NewTemplateClient returns a new Client that returns TemplatePipes.
type DataSource ¶
type DataSource interface {
Get(ctx context.Context, key string) (data []byte, wasPresent bool, err error)
}
DataSource returns data from a source, or possibly a default value if the key wasn't present. If the source returns a default, it should indicate this by returning `false` for wasPresent.
type FileSystem ¶
type FileSystem struct {
// contains filtered or unexported fields
}
FileSystem loads files from a base directory.
func NewFileSystem ¶
func NewFileSystem(base string) FileSystem
NewFileSystem returns a new FileSystem using the given base directory.
The given base doesn't have to be absolute, but GetFile avoids "moving up" past the base directory path.
type HTTPTask ¶
type HTTPTask struct { MaxRetries int `json:"maxRetries"` Method string `json:"method,omitempty"` URL string `json:"url,omitempty"` Body json.RawMessage `json:"body,omitempty"` Headers map[string][]string `json:"headers,omitempty"` SkipCertVerify bool `json:"skipCertVerify"` }
HTTPTask executes an HTTP request.
type InputTask ¶
type InputTask struct {
Default json.RawMessage `json:"default"`
}
InputTask serves input to Pipeline-based task types.
Optionally, Input tasks can include default values.
type Interval ¶
type JSONPipe ¶
JSONPipe adapts a function into a Client that unmarshal's the Task's raw data into the returned Pipe.
type JSONValidationTask ¶
type LoadTask ¶
type LoadTask struct { Key string `json:"name"` Default json.RawMessage `json:"default"` // default value, if not present // contains filtered or unexported fields }
LoadTask loads data from a DataSource using a key. If the key isn't present, it'll return the Default, which may be nil.
type MQTTClient ¶
type MQTTClient struct { ClientID string Username string Password string Endpoint string TimeoutSecs int SkipCertVerify bool // contains filtered or unexported fields }
func (*MQTTClient) UnmarshalJSON ¶
func (mqttClient *MQTTClient) UnmarshalJSON(data []byte) error
type MQTTTask ¶
type MQTTTask struct {
Message []json.RawMessage `json:"message"`
}
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is a concurrency-safe in-memory k/v store.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore returns a new instance of a MemoryStore
type Pipe ¶
type Pipe interface { // Execute executes an operation and writes its result to the given writer. // If the Pipe doesn't have a result, it may choose to simply not write // anything. An empty result isn't typically considered an error, but a Task // may choose to view an empty result as an error explicitly. The input map // represents data from Links, mapped as linkName: pipeOutput. Execute(ctx context.Context, w io.Writer, input map[string][]byte) error }
Pipe implementers handle the work required to execute a Task.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline is a series of Pipes through which data can flow.
Pipelines hold on to a reference to their underlying PipelineConfig. Multiple Pipelines can share the same PipelineConfig, and although you can execute a Pipeline concurrently multiple times, you should not modify the PipelineConfig while a Pipeline using it is running.
func (*Pipeline) Execute ¶
Execute a Pipeline by running each task in dependency order until either the Pipeline is complete, or a Task indicates that the Pipeline should not continue.
Execution may also be canceled via the Context, but it is up to tasks to play nice and respect cancellation requests; at the very least, the pipeline will stop as soon as the current task completes.
type PipelineConfig ¶
type PipelineConfig struct { Name string `json:"name"` Description string `json:"description"` Trigger Trigger `json:"trigger"` Tasks taskMap `json:"tasks"` TimeoutSecs *int `json:"timeoutSeconds,omitempty"` DefaultOutput *string `json:"defaultOutput"` }
func (*PipelineConfig) Validate ¶
func (pc *PipelineConfig) Validate() error
Validate a PipelineConfig by ensuring all required fields are set, all references have a known referent, and the task graph is acyclic.
type PipelineContext ¶
type PipelineContext struct {
// contains filtered or unexported fields
}
PipelineContext represents the current execution state of a Pipeline.
Every time a Pipeline is executed, a new PipelineContext is created for it, used to store the status and output of the various Pipes during execution.
type PipelineTask ¶
type PipelineTask struct { OutputTask string `json:"outputTask"` Inputs map[string]json.RawMessage `json:"inputs"` // contains filtered or unexported fields }
PipelineTask allows you to use another Pipeline as if it were its own Task type.
The output of the Pipeline is the output of the task with name "OutputTask". Inputs may come from links, as usual, or may be given as "Defaults".
func (*PipelineTask) Execute ¶
Execute runs the attached Pipeline with data from other Pipes.
It sets up a new pipeline context and prepares it with the data from the Task's raw input values. It creates the initial job contexts, and for any Input tasks, it marks the job successfully completed and assigns it its value: either data linked data from a Pipe, or its default value if nothing was piped in.
After preparing the context, it executes the underlying Pipeline. When it completes, it finds the job associated with the output Task, checks its status, and sends the job's output to the writer.
Note that while the underlying pipeline runs in a different PipelineContext, they share the same Golang context.Context, so the "calling" pipeline's context controls timeouts -- not the pipeline on which this Pipe is based.
type Plumber ¶
type Plumber struct { // Client define how the Plumber maps Tasks to Pipes. Clients map[string]Client }
Plumber constructs Pipelines.
The Plumber keeps a map of task types to PipeGenerators, which interpret Task definitions to generate Pipes. The Plumber parses the config and links these pipes together.
func NewPlumber ¶
func NewPlumber() Plumber
NewPlumber returns a new Plumber with the default task types.
You can modify the Clients map to change how tasks are constructed and which tasks are allowed.
By default, the Plumber includes the following task types: - http: execute an HTTPTask with the Go's default HTTP client - validation: validate some dat against an JSON schema - input: accept input from another task, as is used for Pipeline-based Clients.
func (Plumber) NewPipeline ¶
func (plumber Plumber) NewPipeline(conf *PipelineConfig) (*Pipeline, error)
NewPipeline constructs a new Pipeline from a given PipelineConfig.
func (Plumber) SetSource ¶
func (plumber Plumber) SetSource(taskType string, source DataSource)
SetSource configures the Plumber to supply data from a given source.
func (Plumber) SetTemplateSource ¶
func (plumber Plumber) SetTemplateSource(taskType string, source DataSource)
SetTemplateSource adds a template client under the given name.
type SimpleJSONPipe ¶
type SimpleJSONPipe func() Pipe
SimpleJSONPipe adapts functions that just return a new instance of a Pipe into PipeGenerators that unmarshals the task's Raw data into the Pipe.
type State ¶
type State int
State represents the current execution state of a task or pipeline.
func (State) MarshalJSON ¶
type Status ¶
type Status struct { State State StartedAt time.Time CompletedAt time.Time Err error Attempts int // number of times executed }
Status tracks the status of a Pipeline or Job.
func (*Status) Duration ¶
Duration returns the Duration of time between CompletedAt and StartedAt.
If CompletedAt.IsZero() is true (likely implying that the State is Waiting), then this method returns a Duration of 0.
func (Status) MarshalJSON ¶
type StoreTask ¶
type StoreTask struct { Key string `json:"name"` Value json.RawMessage // contains filtered or unexported fields }
StoreTask sends data to a Sink.
type Task ¶
type Task struct { TaskType string `json:"type"` Raw json.RawMessage `json:"raw,omitempty"` Links map[string]Link `json:"links,omitempty"` // dependencies Successes []string `json:"ifSuccessful,omitempty"` // tasks that must finish, but data doesn't matter Failures []string `json:"ifFailed,omitempty"` // not currently used StopIfEmpty bool `json:"stopIfEmpty"` ErrorIfEmpty bool `json:"errorIfEmpty"` DisableResultLog bool `json:"disableResultLog"` ContinueOnError bool `json:"continueOnError"` // contains filtered or unexported fields }
Task tells a Plumber how it should construct a specific Pipe.
Just as Blueprints are configuration for Pipelines, Tasks are configuration for Pipes. It's up to the Plumber to interpret how the Task should be satisfied.
Tasks can be directly unmarshaled from JSON.
func (*Task) Dependencies ¶
type TemplatePipe ¶
type TemplatePipe struct {
// contains filtered or unexported fields
}
type TemplateTask ¶
type TemplateTask struct { Data map[string]json.RawMessage `json:"initialData,omitempty"` Namespaces []string `json:"namespaces"` TemplateName string `json:"template"` }