core

package module
v0.18.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2020 License: Apache-2.0 Imports: 17 Imported by: 0

README

Open Integration - a pipeline execution engine

Go Report Card

stop using YAMLs

This is not production ready stuff

Concepts

  • A compiled, binray pipeline

  • State - the engine holds the state of all the tasks

  • Service - a standalone binary exposing API over http2 (gRPC) that the engine can trigger, called endpoint.

  • Task - a request from a service to run some logic.

  • Endpoint of a service defined by 2 files of JSON schema, arguments.json and returns.json, the engine will enforce the arguments given by a task and the output created to match the schema.

Example

  • An example of pipeline can be found here. The pipeline scans my Trello board and update a rows in my Google Spreadsheet, once a card been moved to "Done" list, it will be archived from the board.

Documentation

Index

Constants

View Source
const (
	EventEngineStarted  = "engine.started"
	EventEngineFinished = "engine.finished"
	EventTaskStarted    = "task.started"
	EventTaskFinished   = "task.finished"
)
View Source
const (
	// EngineStateInProgress pipeline in execution progress
	EngineStateInProgress string = "in-progress"
	// EngineStateFinished pipeline is finished execution
	EngineStateFinished string = "finished"

	// TaskStateInProgress task is in progress
	TaskStateInProgress string = EngineStateInProgress

	// TaskStateFinished task is finished
	TaskStateFinished string = EngineStateFinished

	// TaskStatusSuccess set one the task status in case task was finished successfully
	TaskStatusSuccess = "Success"
	// TaskStatusFailed set one the task status in case task was finished and failed
	TaskStatusFailed = "failed"
)

Variables

This section is empty.

Functions

func ConditionEngineStarted

func ConditionEngineStarted(ev *Event, state *State) bool

ConditionEngineStarted returns true once the engine emits started event

func ConditionTaskFinishedWithStatus

func ConditionTaskFinishedWithStatus(task string, status string) func(ev *Event, state *State) bool

ConditionTaskFinishedWithStatus returns true once a task is finished with given status

Types

type Argument

type Argument struct {
	Key   string
	Value interface{}
	// Func returns a dynamic Value instead of Argument.Values
	Func func() interface{}
}

Argument is key value struct that should be passed in a service call

func (*Argument) GetKey

func (a *Argument) GetKey() string

func (*Argument) GetValue

func (a *Argument) GetValue() interface{}

type Condition

type Condition struct {
	// Name of the condition
	Name string
	// Func runs all the times
	Func func(*Event, *State) bool
}

Condition determine whenever run a task

type Engine

type Engine interface {
	Run() error
}

Engine exposes the interface of the engine

func NewEngine

func NewEngine(opt *EngineOptions) Engine

NewEngine create new engine

type EngineOptions

type EngineOptions struct {
	Logger    logger.Logger
	Pipeline  Pipeline
	State     *State
	EventChan chan *Event
}

EngineOptions to create new engine

type Event

type Event struct {
	Metadata EventMetadata
	Payload  map[string]interface{}
}

Event - means that something happen

type EventMetadata

type EventMetadata struct {
	Name      string
	ID        ID
	CreatedAt time.Time
}

type ID

type ID string

ID is Uniqe ID

type Pipeline

type Pipeline struct {
	Metadata PipelineMetadata
	Spec     PipelineSpec
}

Pipeline is the pipeline representation

type PipelineMetadata

type PipelineMetadata struct {
	Name         string
	OS           string
	Architecture string
}

PipelineMetadata holds all the metadata of a pipeline

type PipelineSpec

type PipelineSpec struct {
	Tasks    []Task
	Services []Service
}

PipelineSpec is the spec of a pipeline

type Service

type Service struct {
	// Name is official service which is part of the service catalog (https://github.com/open-integration/core-services/releases)
	Name string
	// Version of the service, empty string will use the latest version from catalog
	Version string
	// Path a location of the local fs the service can be found, Path cannot be set with Name together
	Path string
	// As alias name to refer the service as part of the task implementation
	As string
}

Service is a Service a pipeline should execute

type State

type State struct {
	History []historyElem `yaml:"history"`

	Metadata StateMetadata    `yaml:"metadata"`
	Tasks    map[ID]TaskState `yaml:"tasks"`
	// contains filtered or unexported fields
}

State is the overall state of the pipeline execution

func NewState

func NewState(opt *StateOptions) *State

NewState creates new state store

type StateMetadata

type StateMetadata struct {
	// State is in-progress / finished
	State string `yaml:"state"`
}

StateMetadata metadata of the statee

type StateOptions

type StateOptions struct {
	EventCn          chan *Event
	Logger           logger.Logger
	StateFile        string
	EventHistoryFile string
}

StateOptions to creates state

type Task

type Task struct {
	Metadata TaskMetadata
	Spec     TaskSpec
	SpecFunc func(*State) (*TaskSpec, error)
	// Condition a set of conditions to run before task execution
	Condition *Condition
}

Task is a task a pipeline should execute

type TaskMetadata

type TaskMetadata struct {
	Name string
	// Reusable set to true to run the task multiple times
	Reusable bool
}

TaskMetadata holds all the metadata of a pipeline

type TaskSpec

type TaskSpec struct {
	Service   string
	Endpoint  string
	Arguments []Argument
}

TaskSpec is the spec of a task

type TaskState

type TaskState struct {
	State   string `yaml:"state"`
	Status  string `yaml:"status"`
	Task    string `yaml:"task"`
	Output  string `yaml:"output"`
	Error   string `yaml:"error"`
	EventID ID     `yaml:"event-id"`
}

TaskState state of singal task

Directories

Path Synopsis
internal
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL