conductor

package
Version: v2.10.2+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2019 License: Apache-2.0 Imports: 7 Imported by: 0

README

Go client for Conductor

Go client for Conductor provides two sets of functions:

  1. Workflow Management APIs (start, terminate, get workflow status etc.)
  2. Worker execution framework

Prerequisites

Go must be installed and GOPATH env variable set. Directory $GOPATH/src/conductor must not be in use.

Install

./install.sh

This will create a Go project under $GOPATH/src/conductor and download any dependencies. It can then be ran:

go run $GOPATH/src/conductor/startclient/startclient.go

Install and Run

./install_and_run.sh

This will create a Go project under $GOPATH/src/conductor and download any dependencies. In addition, it will run the go application listed under startclient/startclient.go

Uninstall

WARNING: This will simply remove the $GOPATH/src/conductor directory where it has installed so if other work is there, it will be deleted. Use with caution.

./uninstall.sh

Using Workflow Management API

Go struct ConductorHttpClient provides client API calls to the conductor server to start and manage workflows and tasks.

Example
package main

import (
    "conductor"
)

func main() {
    conductorClient := conductor.NewConductorHttpClient("http://localhost:8080")
    
    // Example API that will print out workflow definition meta
    conductorClient.GetAllWorkflowDefs()
}

Task Worker Execution

Task Worker execution APIs facilitates execution of a task worker using go. The API provides necessary tools to poll for tasks at a specified interval and executing the go worker in a separate goroutine.

Example

The following go code demonstrates workers for tasks "task_1" and "task_2".

package task

import (
    "fmt"
)

// Implementation for "task_1"
func Task_1_Execution_Function(t *task.Task) (taskResult *task.TaskResult, err error) {
    log.Println("Executing Task_1_Execution_Function for", t.TaskType)

    //Do some logic
    taskResult = task.NewTaskResult(t)
    
    output := map[string]interface{}{"task":"task_1", "key2":"value2", "key3":3, "key4":false}
    taskResult.OutputData = output
    taskResult.Status = "COMPLETED"
    err = nil

    return taskResult, err
}

// Implementation for "task_2"
func Task_2_Execution_Function(t *task.Task) (taskResult *task.TaskResult, err error) {
    log.Println("Executing Task_2_Execution_Function for", t.TaskType)

    //Do some logic
    taskResult = task.NewTaskResult(t)
    
    output := map[string]interface{}{"task":"task_2", "key2":"value2", "key3":3, "key4":false}
    taskResult.OutputData = output
    taskResult.Status = "COMPLETED"
    err = nil

    return taskResult, err
}

Then main application to utilize these workers

package main

import (
    "conductor"
    "conductor/task/sample"
)

func main() {
    c := conductor.NewConductorWorker("http://localhost:8080", 1, 10000)

    c.Start("task_1", sample.Task_1_Execution_Function, false)
    c.Start("task_2", sample.Task_2_Execution_Function, true)
}

Note: For the example listed above the example task implementations are in conductor/task/sample package. Real task implementations can be placed in conductor/task directory or new subdirectory.

Documentation

Overview

Copyright 2017 Netflix, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2017 Netflix, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConductorHttpClient

type ConductorHttpClient struct {
	// contains filtered or unexported fields
}

func NewConductorHttpClient

func NewConductorHttpClient(baseUrl string) *ConductorHttpClient

func (*ConductorHttpClient) AckTask

func (c *ConductorHttpClient) AckTask(taskId string, workerid string) (string, error)

func (*ConductorHttpClient) CreateWorkflowDef

func (c *ConductorHttpClient) CreateWorkflowDef(workflowDefBody string) (string, error)

func (*ConductorHttpClient) GetAllTaskDefs

func (c *ConductorHttpClient) GetAllTaskDefs() (string, error)

func (*ConductorHttpClient) GetAllTasksInQueue

func (c *ConductorHttpClient) GetAllTasksInQueue() (string, error)

func (*ConductorHttpClient) GetAllWorkflowDefs

func (c *ConductorHttpClient) GetAllWorkflowDefs() (string, error)

func (*ConductorHttpClient) GetRunningWorkflows

func (c *ConductorHttpClient) GetRunningWorkflows(workflowName string, version int, startTime float64, endTime float64) (string, error)

func (*ConductorHttpClient) GetTask

func (c *ConductorHttpClient) GetTask(taskId string) (string, error)

func (*ConductorHttpClient) GetTaskDef

func (c *ConductorHttpClient) GetTaskDef(taskDefName string) (string, error)

func (*ConductorHttpClient) GetTaskQueueSizes

func (c *ConductorHttpClient) GetTaskQueueSizes(taskNames string) (string, error)

func (*ConductorHttpClient) GetWorkflow

func (c *ConductorHttpClient) GetWorkflow(workflowId string, includeTasks bool) (string, error)

func (*ConductorHttpClient) GetWorkflowDef

func (c *ConductorHttpClient) GetWorkflowDef(workflowName string, version int) (string, error)

func (*ConductorHttpClient) PauseWorkflow

func (c *ConductorHttpClient) PauseWorkflow(workflowId string) (string, error)

func (*ConductorHttpClient) PollForTask

func (c *ConductorHttpClient) PollForTask(taskType string, workerid string) (string, error)

func (*ConductorHttpClient) RegisterTaskDefs

func (c *ConductorHttpClient) RegisterTaskDefs(taskDefsMeta string) (string, error)

func (*ConductorHttpClient) RemoveTaskFromQueue

func (c *ConductorHttpClient) RemoveTaskFromQueue(taskType string, taskId string) (string, error)

func (*ConductorHttpClient) RerunWorkflow

func (c *ConductorHttpClient) RerunWorkflow(workflowId string, rerunWorkflowRequest string) (string, error)

func (*ConductorHttpClient) RestartWorkflow

func (c *ConductorHttpClient) RestartWorkflow(workflowId string) (string, error)

func (*ConductorHttpClient) ResumeWorkflow

func (c *ConductorHttpClient) ResumeWorkflow(workflowId string) (string, error)

func (*ConductorHttpClient) SkipTaskFromWorkflow

func (c *ConductorHttpClient) SkipTaskFromWorkflow(workflowId string, taskReferenceName string, skipTaskRequestBody string) (string, error)

func (*ConductorHttpClient) StartWorkflow

func (c *ConductorHttpClient) StartWorkflow(workflowName string, version int, correlationId string, inputJson string) (string, error)

func (*ConductorHttpClient) TerminateWorkflow

func (c *ConductorHttpClient) TerminateWorkflow(workflowId string, reason string) (string, error)

func (*ConductorHttpClient) UnRegisterTaskDef

func (c *ConductorHttpClient) UnRegisterTaskDef(taskDefName string) (string, error)

func (*ConductorHttpClient) UnRegisterWorkflowDef added in v1.10.13

func (c *ConductorHttpClient) UnRegisterWorkflowDef(workflowDefName string, version int) (string, error)

func (*ConductorHttpClient) UpdateTask

func (c *ConductorHttpClient) UpdateTask(taskBody string) (string, error)

func (*ConductorHttpClient) UpdateTaskDef

func (c *ConductorHttpClient) UpdateTaskDef(taskDefMeta string) (string, error)

func (*ConductorHttpClient) UpdateWorkflowDefs

func (c *ConductorHttpClient) UpdateWorkflowDefs(workflowDefsBody string) (string, error)

type ConductorWorker

type ConductorWorker struct {
	ConductorHttpClient *ConductorHttpClient
	ThreadCount         int
	PollingInterval     int
}

func NewConductorWorker

func NewConductorWorker(baseUrl string, threadCount int, pollingInterval int) *ConductorWorker

func (*ConductorWorker) Execute

func (c *ConductorWorker) Execute(t *task.Task, executeFunction func(t *task.Task) (*task.TaskResult, error))

func (*ConductorWorker) PollAndExecute

func (c *ConductorWorker) PollAndExecute(taskType string, executeFunction func(t *task.Task) (*task.TaskResult, error))

func (*ConductorWorker) Start

func (c *ConductorWorker) Start(taskType string, executeFunction func(t *task.Task) (*task.TaskResult, error), wait bool)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL