conveyor

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2021 License: MIT Imports: 10 Imported by: 0

README

conveyor

GitHub license FOSSA Status

HitCount

Maintainability Test Coverage

A go pipeline management library, supporting concurrent pipelines, with multiple nodes and joints.

TL;DR: A pipeline is a standard concurrency pattern in Go, at least in terms of use case & functionality. And there are multiple ways people create them based on their specific requirements. This project is my attempt to create a generic one that can be used in most, if not all use-cases. If you are already aware of what a pipeline is, you can move on to examples. or Implementation section.

You can also read more about pipelines from here

What does Conveyor do?

Conveyor does what the name suggests. Just like a conveyor belt, or a production-line, where there is a series of workers, working to prepare a final product. But each one of them has a specific responsibility, may be, to add a component, or to put a sticker on the product, or to wrap it in a shiny packaging. They all work on one item at a time, but once they are done with their part, they can work on the next item in the queue, while the next worker does the next thing, at the same time. Now depending on the time each operation takes, you might want to have more than one worker at any/all stages.

This is exactly what this library does. At it's core, it has node workers and joint workers. What we just discussed are nodes. While joints, as their name suggests are like the plumbing joints of a pipeline. They can distribute, or replicate incoming data across branches.

There are 3 kind of Nodes:

  1. Source Nodes: These are the nodes that are the source of data. It may be a database, a file, a data stream, an API server, or anything that can generate new bits of data. In short, create some data, and send it forward.
  2. Operation Nodes: These are the nodes that "do something" with the data. They might process/enrich/change/replace the data, based on your business logic. The example shows a simple example of squaring/adding operations. In your application, it may be making API calls to other services, doing matrix multiplications, etc. In short, take data from previous node, and send modified/new data to next node.
  3. Sink Nodes: These are supposed to be the last node(s) in the conveyor. This is where you finalise your work, may be send the final data to some other external API/stream, save it to a database/file, or just print it on console.

If we talk about Joints, there can be multiple implementation, based on your need. For now, there is a built-in joint(ReplicateJoint) that conveyor has, which replicates same data to be sent to multiple nodes at next stage. More will be added in future.

How to implement your own nodes and joints?

There are these 2 interfaces that you need to implement in your own types, for Nodes & Joints, respectively:

// NodeExecutor interface is the interface that you need to implement by your own types of nodes
type NodeExecutor interface {
	GetName() string
	GetUniqueIdentifier() string
	ExecuteLoop(ctx CnvContext, inChan <-chan map[string]interface{}, outChan chan<- map[string]interface{}) error
	Execute(ctx CnvContext, inData map[string]interface{}) (map[string]interface{}, error)
	Count() int
	CleanUp() error
}

// JointExecutor interface is the interface that you need to implement by your own types of joints
type JointExecutor interface {
	GetName() string
	GetUniqueIdentifier() string
	ExecuteLoop(ctx CnvContext, inChan []chan map[string]interface{}, outChan []chan map[string]interface{}) error
	Count() int
	InputCount() int
	OutputCount() int
}
  • GetName() returns a string to represent the name of the executor.

  • GetUniqueIdentifier() returns a unique-identifier to recognise the executor (you may just return the name).

  • Count() decides the concurrency you want to have for your node.

  • CleanUp() is called once node's job is completed. To do any cleanup activity (like closing a file/database connection, or to wait till your other go-routines finish)

If you don't want to implement all of these methods, it makes sense. There's a struct conveyor.ConcreteNodeExecutor that you can extend, which gives default implementations of these 4 methods. You can get up and running without overriding them, but for an actual application, where hopefully, you will need concurrency, don't forget to return "something > 1: from Count(). it's default value is 1. Also, always Cleanup() after yourself.

But, you must implement one of the below 2 methods, based on what you want your node to do. Their default implementation just returns ErrExecuteNotImplemented error.

  • Execute() receives a map[string]interface{} (inData) as input, and returns a map[string]interface{} as output. This is the method that you must implement based on what you want your node to do. This is the method gets called, if you have added your node to work in "Transaction Mode" (check example for more on that). In most cases, you can just get the most out of this method. Here we create a new Go-routine for each request, and number of go-routines running at a given time, is decided by Count(). If your Execute() for source node returns an error ErrSourceExhausted, conveyor will assume that it's time to finish up, and will stop reading any more data, and will gracefully shutdown after processing already read data. Generally, if any node returns a non-nil error, that particular unit of data would be dropped, and won't be sent to the next node.

  • ExecuteLoop() gets called if your Executor has been added to work in "Loop Mode". It receives 2 channels (inChan & outChan) having map[string]interface{}. You will be needed to run a for loop, to read inputs from inChan, and write it to outChan It is supposed to be a blocking function, but the loop should exit once you are done. If there are 10 instances of this function running, and all of them return, Conveyor will take that as a signal to shutdown, the node, and the ones that come next to it. As a rule of thumb, always let your source dictate when to finish up, all other will automatically follow it's lead.

Why did I go for the approach of "Implementing an interface", and not "Writing a function" for each node, like the one mentioned here ?

A function is what I had started with, but soon realised that if I am going to do anything flexible, I need something more than a single function. For example, I might want to run a SQL query while creating a node, and fetch the results inside the source node, before starting the conveyor machinery (say, inside a func New MySQLSource()), and then with each call of Execute(), I would just return the next entry. Or if I am using ExecuteLoop() I can just keep writing to the output channel inside a for loop.

I couldn't do it elegantly using just a function. Also the Count() lets each node decide, how many concurrent go-routines should run for it. Which brings us to the next important gotcha- Always keep your Execute() & ExecuteLoop() methods race-free. Don't modify any of the receiver struct's parameters, inside these 2 functions. Just work with the incoming data, and stream the output to the next channel(s) If your read operation(for source), isn't race-free (eg. reading from a file), consider using a single go-routine for source, and more for heavy lifting in later nodes.

Monitoring, Logging, Progress tracking, and Timeout/Killing.

This is what I believe, is a good to have for anything that solves real world problems. Continue with this section, when you are really building something with Conveyor, and have run/gone through basic examples.

If you have noticed, the first argument in both Execute() & ExecuteLoop() is an interface ctx CnvContext. There is already a default implementation, that you get when you create a new conveyor instance, using:

  • NewConveyor()

You can also implement your own, and pass it while creating a conveyor using:

  • NewConveyor().SetCustomContext(ctx)

This is all that you can do with this Context, with default implementation:

  • Logging: Conveyor doesn't write it's internal logs, rather it keeps publishing the logs to a channel that you can read from. by calling conveyorInstance.Logs(), you can just keep a go-routine running, to print/log those messages. This way, you can use your own logging library, in place of getting stuck with the one of my choice.

  • Monitoring: If your implementation wants to convey some status messages like Running MySQL Query, Processed 5 batches of 100 requests, etc, just call ctx.SendStatus("Started doing something cool") to publish these status messages. You can read them just like logs, by reading from channel given by conveyorInstance.Status().

  • Killing: If you want to kill your conveyor, before your source has done reading the input, just call either conveyorInstance.Stop() or ctx.Cancel() (whichever is handy). It will signal all the internal go-routines to shutdown. However, if you have started your own go-routines running long loops, or if you are using ExecuteLoop(), you will have to keep monitoring ctx.Done() frequently, to make sure that you don't leak out your go-routines, when you kill a conveyor. This small stub will do that for you:

    select {
     case <-ctx.Done():
        break someLoop // or return
     default:
     }
    
  • Timeout: If you want your conveyor to be killed if it's not done within a fixed time, then use:

    cnv, err := NewConveyor()
    cnv.SetTimeout(timeout)
    

    But remember, if you want to use a custom context, you should set that first, and then set this timeout.

     cnv, err := NewConveyor()
     cnv.SetCustomContext(ctx).SetTimeout(timeout)
    
  • Progress: If you want to see what % of work is completed, use EnableProgress()to set expected runtime & to enable progress.

     cnv, err := NewConveyor()
     cnv.EnableProgress(expectedDuration)
    

    To get the progress information, once conveyor is started, just read from channel given by conveyorInstance.Progress(). It decides progress based on a estimate that you provide as expectedDuration. So if you have predicted 2 hour, it will be at 50% after an hour And if it's still not done after 2 hours, it will just wait at 99%, unless complete.

Needs more work: Working with a distributed conveyor-based application

To go further, you need to know about Conveyor Life Cycle Handling. Conveyor was made to work with API based applications, that might be running on multiple servers, and we might need to retain the data even after a restart. In such scenarios, you might actually want to store state, status messages, progress in some persistent manner, so that you can read that information on any server (even if a particular conveyor isn't running on that particular server)

To create a conveyor based application that can work across a distributed system, you will need to provide an implementation of the below interface, where you can fetch/store data to/from your choice of database/cache

// LifeCycleHandler handles conveyor start/stop
type LifeCycleHandler interface {
	GetState() (string, error)
	GetStatusMsg() (string, error)
	UpdateStatusMsg(string) error
	GetProgress() (string, error)
	UpdateProgress(string) error

	MarkPreparing() error
	MarkStarted() error
	MarkToKill() error
	MarkKilled() error
	MarkFinished() error
	MarkError() error
}

In the implementation, that I use, in one of my applications, I store these details on a redis cluster. In future, I do plan to simplify it a bit, and maybe, provide an in-built globas hash-based implementation for those who want to run a single server application.

For now, you can just use a map, to store these values in-memory, for single server applications.

Then, you can call conveyorInstance.MarkCurrentState(x) to change the state, where x can be one of the conveyor.Status** values.

It's currently a pretty new project, and I am open to new ideas and suggestions. Goes without saying, issues/contributions are welcomed as well.

Documentation

Index

Constants

View Source
const (
	// StatusPreparing status is used to mark a conveyor to be in "preparing" state
	StatusPreparing = "preparing"

	// StateStarted status is used to mark a conveyor to be in "started" state
	StateStarted = "started"

	// StateToKill status is used to mark a conveyor has been setup "to be killed", but isn't yet dead
	StateToKill = "toKill"

	// StateKilled status is used to mark a conveyor as "successfully killed"
	StateKilled = "killed"

	// StateFinished status is used to mark a conveyor as "successfully finished"
	StateFinished = "finished"

	// StateInternalError status is used to indicate that conveyor couldn't finish due to some internal error
	StateInternalError = "internalError"
)

Valid States for a Conveyor

View Source
const (
	// WorkerTypeSource constant
	WorkerTypeSource = "SOURCE_WORKER"
	// WorkerTypeOperation constant
	WorkerTypeOperation = "OPERATION_WORKER"
	// WorkerTypeSink constant
	WorkerTypeSink = "SINK_WORKER"
	// WorkerTypeJoint constant
	WorkerTypeJoint = "JOINT_WORKER"
)
View Source
const (
	// WorkerModeTransaction is the worker mode in which executor just needs to implement Execute(ctx) method
	// and doesn't need to handle worker's channel or shutdown of worker.
	// But executor should still monitor ctx.Done() to shutdown or cleanup/close any files/connections, etc that it opens.
	// This one helps keep the executor code leaner & simple
	// This mode is useful for most of the cases, including executors that do file I/O, database lookup, remote API call
	WorkerModeTransaction = WorkerMode(iota + 10)

	// WorkerModeLoop is the worker mode in which executor just needs to implement ExecuteLoop(ctx, inChan, outChan) method
	// and has to handle the copying of data from/to channels (except, closing them), executor will also need to ensure
	// that it monitors ctx.Done() to shutdown worker, in case of any error.
	// Needs more code, use only if you are ready to peek into how it works.
	// Some use cases are, where you can't fetch data on-demand with a function call. Eg. Running an API server as source
	WorkerModeLoop
)

Variables

View Source
var (
	// ErrInvalidWorkerType error
	ErrInvalidWorkerType = errors.New("Invalid worker type. pick one from conveyor.WorkerTypeSource/conveyor.WorkerTypeOperation/conveyor.WorkerTypeSink")

	// ErrInvalidWorkerMode error
	ErrInvalidWorkerMode = errors.New("Invalid worker mode. pick either conveyor.WorkerModeTransaction or conveyor.WorkerModeLoop")

	// ErrNoNodesAvailable error
	ErrNoNodesAvailable = errors.New("Your action assumes presence of Node Executors in conveyor, but none were found")

	// ErrNoJointsAvailable error
	ErrNoJointsAvailable = errors.New("Your action assumes presence of Joint Executors in conveyor, but none were found")

	// ErrExecuteNotImplemented error
	ErrExecuteNotImplemented = errors.New("This executor doesn't implement Execute() method")

	// ErrExecuteLoopNotImplemented error
	ErrExecuteLoopNotImplemented = errors.New("This executor doesn't implement ExecuteLoop() method")

	// ErrInputChanDoesNotExist error
	ErrInputChanDoesNotExist = errors.New("input channel doesn't exist for this node")
	// ErrOutputChanDoesNotExist error
	ErrOutputChanDoesNotExist = errors.New("output channel doesn't exist for this node")

	// ErrSourceExhausted error
	ErrSourceExhausted = errors.New("Source executor is exhausted")
	// ErrSourceInternal error
	ErrSourceInternal = errors.New("Source executor internal error")

	// ErrFetchRejected error
	ErrFetchRejected = errors.New("Fetch executor rejected the transaction")
	// ErrFetchInternal error
	ErrFetchInternal = errors.New("Fetch executor internal error")

	// ErrSinkRejected error
	ErrSinkRejected = errors.New("Sink executor rejected data")
	// ErrSinkInternal error
	ErrSinkInternal = errors.New("Sink executor internal error")

	// ErrLessInputChannelsInJoint error
	ErrLessInputChannelsInJoint = errors.New("JointWorker doesn't have enough input channels")

	// ErrLessOutputChannelsInJoint error
	ErrLessOutputChannelsInJoint = errors.New("JointWorker doesn't have enough output channels")

	// ErrLifeCycleNotSupported error
	ErrLifeCycleNotSupported = errors.New("This conveyor instance isn't created with Life Cycle Support")
)
View Source
var (
	// ErrNoInputChannel error
	ErrNoInputChannel = errors.New("number of input channels is 0")

	// ErrNoOutputChannel error
	ErrNoOutputChannel = errors.New("number of output channels is 0")

	// ErrMultipleInputChannels error
	ErrMultipleInputChannels = errors.New("only one input channel can be replicated")

	// ErrOneToOneConnection error
	ErrOneToOneConnection = errors.New("replicate joint isn't needed for one-to one mapping, " +
		"you can just link the nodes directly")
)
View Source
var (
	// ErrEmptyConveyor error
	ErrEmptyConveyor = errors.New("conveyor is empty, no workers employed")
)

Functions

func LinkJointAfterNode added in v1.0.2

func LinkJointAfterNode(nw NodeWorker, jw JointWorker, index int) error

LinkJointAfterNode links JointWorker after NodeWorkers, maps input channel of joint worker on output channel of node worker

func LinkNodeAfterJoint added in v1.0.2

func LinkNodeAfterJoint(jw JointWorker, nw NodeWorker) error

LinkNodeAfterJoint links NodeWorkers after JointWorker, maps input channel of a b on output channel of a

func LinkWorker2Worker

func LinkWorker2Worker(a NodeWorker, b NodeWorker) error

LinkWorker2Worker links two NodeWorkers, maps input channel of a b on output channel of a

Types

type CnvContext

type CnvContext interface {
	context.Context
	WithCancel() CnvContext
	WithTimeout(time.Duration) CnvContext
	Cancel()
	SendLog(int32, string, error)
	SendStatus(string)
	GetData() interface{}
}

CnvContext is an interface, which is satisfied by CnvContext. This interface is primarily to enabling mocking for unit-testing.CnvContextAble Or may be, something fancy that you might want to do.

type ConcreteJointExecutor added in v1.0.1

type ConcreteJointExecutor struct {
	Name string
}

ConcreteJointExecutor struct represents a concrete node structure

func (*ConcreteJointExecutor) Count added in v1.0.1

func (cjh *ConcreteJointExecutor) Count() int

Count returns the number of executors required for joint

func (*ConcreteJointExecutor) GetName added in v1.0.2

func (cjh *ConcreteJointExecutor) GetName() string

GetName returns the name of the executor

func (*ConcreteJointExecutor) GetUniqueIdentifier added in v1.0.2

func (cjh *ConcreteJointExecutor) GetUniqueIdentifier() string

GetUniqueIdentifier can be used to fetch a unique string identifying the executor

func (*ConcreteJointExecutor) InputCount added in v1.0.1

func (cjh *ConcreteJointExecutor) InputCount() int

InputCount returns the number of executors required

func (*ConcreteJointExecutor) OutputCount added in v1.0.1

func (cjh *ConcreteJointExecutor) OutputCount() int

OutputCount returns the number of executors required

type ConcreteJointWorker

type ConcreteJointWorker struct {
	*WPool
	Executor JointExecutor
}

ConcreteJointWorker to run different joints

func (*ConcreteJointWorker) Start

func (wp *ConcreteJointWorker) Start()

Start the worker

type ConcreteNodeExecutor added in v1.0.1

type ConcreteNodeExecutor struct {
	Name string
	Data interface{}
}

ConcreteNodeExecutor struct represents a concrete node structure, you should compose it into your node structures

func (*ConcreteNodeExecutor) CleanUp added in v1.0.1

func (cnh *ConcreteNodeExecutor) CleanUp() error

CleanUp does any cleanup if needed after executors are done

func (*ConcreteNodeExecutor) Count added in v1.0.1

func (cnh *ConcreteNodeExecutor) Count() int

Count returns the number of executors required

func (*ConcreteNodeExecutor) Execute added in v1.0.2

func (cnh *ConcreteNodeExecutor) Execute(ctx CnvContext, inData map[string]interface{}) (map[string]interface{}, error)

Execute should take a "map[string]interface{}" as input and returns a map[string]interface{}" as output Ideally it should process the input and either return a new map, or just add it's own keys to it, if we want to retain the data from previous node. This base implementation, just returns an error, so you need to override it with your own. Any struct may define both Execute & ExecuteLoop.Execute Execute will be used if mode is set to conveyor.WorkerModeTransaction

func (*ConcreteNodeExecutor) ExecuteLoop added in v1.0.2

func (cnh *ConcreteNodeExecutor) ExecuteLoop(ctx CnvContext, inChan <-chan map[string]interface{}, outChan chan<- map[string]interface{}) error

ExecuteLoop should take two "map[string]interface{}" channels. It is a more hands-on version of Execute() method, where you have to handle reading from input channel and writing to output channel, after processing on your own This base implementation, just returns an error, so you need to override it with your own. Any struct may define both Execute & ExecuteLoop. ExecuteLoop will be used if mode is set to conveyor.WorkerModeLoop

func (*ConcreteNodeExecutor) GetName added in v1.0.2

func (cnh *ConcreteNodeExecutor) GetName() string

GetName returns the name of the executor

func (*ConcreteNodeExecutor) GetUniqueIdentifier added in v1.0.2

func (cnh *ConcreteNodeExecutor) GetUniqueIdentifier() string

GetUniqueIdentifier can be used to fetch a unique string identifying the executor

type ConcreteNodeWorker

type ConcreteNodeWorker struct {
	*WPool
	WorkerCount int
	Mode        WorkerMode
	Executor    NodeExecutor
}

ConcreteNodeWorker to run different nodes

func (*ConcreteNodeWorker) CreateChannels added in v1.0.2

func (cnw *ConcreteNodeWorker) CreateChannels(buffer int)

CreateChannels creates channels for the worker

func (*ConcreteNodeWorker) Start

func (cnw *ConcreteNodeWorker) Start()

Start the worker

func (*ConcreteNodeWorker) WaitAndStop added in v1.0.2

func (cnw *ConcreteNodeWorker) WaitAndStop(ctx CnvContext) error

WaitAndStop ConcreteNodeWorker

type Conveyor

type Conveyor struct {
	Name string
	// contains filtered or unexported fields
}

Conveyor is base

func NewConveyor added in v1.0.2

func NewConveyor(name string, bufferLen int) (*Conveyor, error)

NewConveyor creates a new Conveyor instance, with all options set to default values/implementations

func (*Conveyor) AddJointExecutor added in v1.0.2

func (cnv *Conveyor) AddJointExecutor(jointExecutor JointExecutor) error

AddJointExecutor creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Node" added to the conveyor, by creating and mapping connecting channels In case there was no node added previously, it skips the linking part

func (*Conveyor) AddJointExecutorAfterNode added in v1.0.2

func (cnv *Conveyor) AddJointExecutorAfterNode(jointExecutor JointExecutor, workerMode WorkerMode, workerType string) error

AddJointExecutorAfterNode creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Joint" added to the conveyor, by creating and mapping connecting channels In case there was no "Joint" added previously, it returns an error

func (*Conveyor) AddJointWorker

func (cnv *Conveyor) AddJointWorker(joint JointWorker) error

AddJointWorker employs a new joint station to the conveyor

func (*Conveyor) AddNodeExecutor added in v1.0.2

func (cnv *Conveyor) AddNodeExecutor(nodeExecutor NodeExecutor, workerMode WorkerMode, workerType string) error

AddNodeExecutor creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Node" added to the conveyor, by creating and mapping connecting channels In case there was no node added previously, it skips the linking part

func (*Conveyor) AddNodeExecutorAfterJoint added in v1.0.2

func (cnv *Conveyor) AddNodeExecutorAfterJoint(nodeExecutor NodeExecutor, workerMode WorkerMode, workerType string) error

AddNodeExecutorAfterJoint creates a worker for a given executor (based on workerMode & workerType) And then links it to the last "Joint" added to the conveyor, by creating and mapping connecting channels In case there was no "Joint" added previously, it returns an error

func (*Conveyor) AddNodeWorker added in v1.0.2

func (cnv *Conveyor) AddNodeWorker(worker NodeWorker, toLink bool) error

AddNodeWorker employs a new worker station to the conveyor

func (*Conveyor) Done added in v1.0.2

func (cnv *Conveyor) Done() <-chan struct{}

Done returns the context.Done() channel of Conveyor

func (*Conveyor) EnableProgress added in v1.0.2

func (cnv *Conveyor) EnableProgress(expectedDuration time.Duration) *Conveyor

EnableProgress sets the expectedDuration of Conveyor to a given value. Also enables progress based on this value of expectedDuration Will have no effect, once you add your first node

func (*Conveyor) GetConveyorContext added in v1.0.2

func (cnv *Conveyor) GetConveyorContext() CnvContext

GetConveyorContext gives the conveyor's context object

func (*Conveyor) GetLastWorker

func (cnv *Conveyor) GetLastWorker() (NodeWorker, error)

GetLastWorker returns the last added worker, or error if conveyor is empty

func (*Conveyor) Logs

func (cnv *Conveyor) Logs() <-chan Message

Logs returns a channel on which Conveyor Statuses will be published

func (*Conveyor) MarkCurrentState added in v1.0.2

func (cnv *Conveyor) MarkCurrentState(state string) error

MarkCurrentState marks the current stage of conveyor using internal life-cycle handler interface

func (*Conveyor) Progress

func (cnv *Conveyor) Progress() <-chan float64

Progress returns a channel which is regularly updated with progress %

func (*Conveyor) SetCustomContext added in v1.0.2

func (cnv *Conveyor) SetCustomContext(ctx CnvContext) *Conveyor

SetCustomContext sets the conveyor's CnvContext interface to a given implementation Will have no effect, once you add your first node This method must be called before you call "SetTimeout()"

func (*Conveyor) SetID added in v1.0.2

func (cnv *Conveyor) SetID(id string) *Conveyor

SetID sets the id of Conveyor to a given string Will have no effect, once you add your first node

func (*Conveyor) SetLifeCycleHandler added in v1.0.2

func (cnv *Conveyor) SetLifeCycleHandler(lch LifeCycleHandler) *Conveyor

SetLifeCycleHandler sets the conveyor's LifeCycleHandler interface to a given implementation Will have no effect, once you add your first node

func (*Conveyor) SetTimeout added in v1.0.2

func (cnv *Conveyor) SetTimeout(timeout time.Duration) *Conveyor

SetTimeout sets the timeout of Conveyor to a given value Will have no effect, once you add your first node If you change the context using "SetCustomContext()" after calling this method, timeout will get reset

func (*Conveyor) Start

func (cnv *Conveyor) Start() error

Start the Conveyor

func (*Conveyor) Status

func (cnv *Conveyor) Status() <-chan string

Status returns a channel on which Conveyor Statuses will be published

func (*Conveyor) Stop

func (cnv *Conveyor) Stop() time.Duration

Stop Conveyor by cancelling context. It's used to kill a pipeline while it's running. No need to call it if the pipeline is finishing on it's own

type CtxData

type CtxData struct {
	Name string
	// contains filtered or unexported fields
}

CtxData stores the information that is stored inside a conveyor, useful for it's lifecycle.ConveyorData Any fields only useful for initialization shouldn't be here

type JointExecutor added in v1.0.1

type JointExecutor interface {
	GetName() string
	GetUniqueIdentifier() string
	ExecuteLoop(ctx CnvContext, inChan []chan map[string]interface{}, outChan []chan map[string]interface{}) error
	Count() int
	InputCount() int
	OutputCount() int
}

JointExecutor interface is the interface that you need to implement in your own types of joints

type JointWorker

type JointWorker interface {
	Start(ctx CnvContext) error
	WaitAndStop() error
	CreateChannels(int)
	SetInputChannels([]chan map[string]interface{}) error
	SetOutputChannels([]chan map[string]interface{}) error
	GetInputChannels() ([]chan map[string]interface{}, error)
	GetOutputChannels() ([]chan map[string]interface{}, error)
	AddInputChannel(chan map[string]interface{}) error
	AddOutputChannel(chan map[string]interface{}) error
}

JointWorker interface binds to nodes that have the capability to fetch intermidiate data, and forward it to next node

func NewJointWorkerPool

func NewJointWorkerPool(executor JointExecutor) JointWorker

NewJointWorkerPool creates a new OperationWorkerPool

type JointWorkerPool

type JointWorkerPool struct {
	*ConcreteJointWorker
	// contains filtered or unexported fields
}

JointWorkerPool struct provides the worker pool infra for Joint interface, that act as connections between nodes

func (*JointWorkerPool) AddInputChannel

func (jwp *JointWorkerPool) AddInputChannel(inChan chan map[string]interface{}) error

AddInputChannel maps a slice of channels on the join't outupt channels

func (*JointWorkerPool) AddOutputChannel

func (jwp *JointWorkerPool) AddOutputChannel(outChan chan map[string]interface{}) error

AddOutputChannel maps a slice of channels on the join't outupt channels

func (*JointWorkerPool) CreateChannels added in v1.0.2

func (jwp *JointWorkerPool) CreateChannels(buffer int)

CreateChannels creates channels for the joint worker

func (*JointWorkerPool) GetInputChannels

func (jwp *JointWorkerPool) GetInputChannels() ([]chan map[string]interface{}, error)

GetInputChannels returns the input channel of Joint WorkerPool

func (*JointWorkerPool) GetOutputChannels

func (jwp *JointWorkerPool) GetOutputChannels() ([]chan map[string]interface{}, error)

GetOutputChannels returns the output channel of Joint WorkerPool

func (*JointWorkerPool) SetInputChannels

func (jwp *JointWorkerPool) SetInputChannels(inChans []chan map[string]interface{}) error

SetInputChannels updates the input channel of Joint WorkerPool

func (*JointWorkerPool) SetOutputChannels

func (jwp *JointWorkerPool) SetOutputChannels(outChans []chan map[string]interface{}) error

SetOutputChannels updates the output channel of Joint WorkerPool

func (*JointWorkerPool) Start

func (jwp *JointWorkerPool) Start(ctx CnvContext) error

Start JoinWorkerPool

func (*JointWorkerPool) WaitAndStop

func (jwp *JointWorkerPool) WaitAndStop() error

WaitAndStop JointWorkerPool

type LifeCycleHandler added in v1.0.2

type LifeCycleHandler interface {
	ProgressUpdater
	StateUpdater
}

LifeCycleHandler handles conveyor start/stop

type LocalLifeCycleHandler added in v1.0.2

type LocalLifeCycleHandler struct {
}

type Message

type Message struct {
	Text     string
	LogLevel int32
}

Message struct stores one unit of message that conveyor passed back for logging

type NodeExecutor added in v1.0.1

type NodeExecutor interface {
	GetName() string
	GetUniqueIdentifier() string
	ExecuteLoop(ctx CnvContext, inChan <-chan map[string]interface{}, outChan chan<- map[string]interface{}) error
	Execute(ctx CnvContext, inData map[string]interface{}) (map[string]interface{}, error)
	Count() int
	CleanUp() error
}

NodeExecutor interface is the interface that you need to implement in your own types of nodes

type NodeWorker

type NodeWorker interface {
	Start(ctx CnvContext) error
	WaitAndStop(ctx CnvContext) error
	CreateChannels(int)
	WorkerType() string
	SetInputChannel(chan map[string]interface{}) error
	SetOutputChannel(chan map[string]interface{}) error
	GetInputChannel() (chan map[string]interface{}, error)
	GetOutputChannel() (chan map[string]interface{}, error)
}

NodeWorker interface binds to nodes that have the capability to fetch intermediate data, and forward it to next node

func NewOperationWorkerPool added in v1.0.2

func NewOperationWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker

NewOperationWorkerPool creates a new OperationWorkerPool

func NewSinkWorkerPool

func NewSinkWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker

NewSinkWorkerPool creates a new SinkWorkerPool

func NewSourceWorkerPool

func NewSourceWorkerPool(executor NodeExecutor, mode WorkerMode) NodeWorker

NewSourceWorkerPool creates a new SourceWorkerPool

type OperationNode added in v1.0.2

type OperationNode struct {
	Pool *OperationWorkerPool
}

OperationNode structue

type OperationWorkerPool added in v1.0.2

type OperationWorkerPool struct {
	*ConcreteNodeWorker
	// contains filtered or unexported fields
}

OperationWorkerPool struct provides the worker pool infra for Operation interface

func (*OperationWorkerPool) CreateChannels added in v1.0.2

func (fwp *OperationWorkerPool) CreateChannels(buffer int)

CreateChannels creates channels for the Operation WorkerPool

func (*OperationWorkerPool) GetInputChannel added in v1.0.2

func (fwp *OperationWorkerPool) GetInputChannel() (chan map[string]interface{}, error)

GetInputChannel returns the input channel of Operation WorkerPool

func (*OperationWorkerPool) GetOutputChannel added in v1.0.2

func (fwp *OperationWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)

GetOutputChannel returns the output channel of Operation WorkerPool

func (*OperationWorkerPool) SetInputChannel added in v1.0.2

func (fwp *OperationWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error

SetInputChannel updates the input channel of Operation WorkerPool

func (*OperationWorkerPool) SetOutputChannel added in v1.0.2

func (fwp *OperationWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error

SetOutputChannel updates the output channel of Operation WorkerPool

func (*OperationWorkerPool) Start added in v1.0.2

func (fwp *OperationWorkerPool) Start(ctx CnvContext) error

Start Operation Worker Pool

func (*OperationWorkerPool) WaitAndStop added in v1.0.2

func (fwp *OperationWorkerPool) WaitAndStop(ctx CnvContext) error

WaitAndStop OperationWorkerPool

func (*OperationWorkerPool) WorkerType added in v1.0.2

func (fwp *OperationWorkerPool) WorkerType() string

WorkerType returns the type of worker

type ProgressUpdater added in v1.0.2

type ProgressUpdater interface {
	GetState() (string, error)
	GetStatusMsg() (string, error)
	UpdateStatusMsg(string) error
	GetProgress() (string, error)
	UpdateProgress(string) error
}

type ReplicateJoint added in v1.0.2

type ReplicateJoint struct {
	*ConcreteJointExecutor
	OutChanCount int
}

ReplicateJoint is a plumbing joint that connects a source/operation node to multiple operation/sink nodes

func NewReplicateJoint added in v1.0.2

func NewReplicateJoint(name string, outChanCount int) (*ReplicateJoint, error)

NewReplicateJoint creates a new joint to replicate same data to multiple channels

func NewReplicateJointWithContext added in v1.0.2

func NewReplicateJointWithContext(cnvCtx CnvContext, name string, outChanCount int) (*ReplicateJoint, error)

NewReplicateJointWithContext creates a new joint (with context) to replicate same data to multiple channels

func (*ReplicateJoint) ExecuteLoop added in v1.0.2

func (rj *ReplicateJoint) ExecuteLoop(cnvCtx CnvContext, inChans []chan map[string]interface{}, outChans []chan map[string]interface{}) error

ExecuteLoop method produces data for other nodes from inputChannel file, and broadcasts copies of this data on all of it's output channels

func (*ReplicateJoint) OutputCount added in v1.0.2

func (rj *ReplicateJoint) OutputCount() int

OutputCount returns the number of executors required

type SinkWorkerPool

type SinkWorkerPool struct {
	*ConcreteNodeWorker
	// contains filtered or unexported fields
}

SinkWorkerPool struct provides the worker pool infra for Sink interface

func (*SinkWorkerPool) CreateChannels added in v1.0.2

func (swp *SinkWorkerPool) CreateChannels(buffer int)

CreateChannels creates channels for the sink worker

func (*SinkWorkerPool) GetInputChannel

func (swp *SinkWorkerPool) GetInputChannel() (chan map[string]interface{}, error)

GetInputChannel returns the input channel of Sink WorkerPool

func (*SinkWorkerPool) GetOutputChannel

func (swp *SinkWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)

GetOutputChannel returns the output channel of Sink WorkerPool

func (*SinkWorkerPool) SetInputChannel

func (swp *SinkWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error

SetInputChannel updates the input channel of Sink WorkerPool

func (*SinkWorkerPool) SetOutputChannel

func (swp *SinkWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error

SetOutputChannel updates the output channel of Sink WorkerPool

func (*SinkWorkerPool) Start

func (swp *SinkWorkerPool) Start(ctx CnvContext) error

Start Sink Worker Pool

func (*SinkWorkerPool) WaitAndStop

func (swp *SinkWorkerPool) WaitAndStop(ctx CnvContext) error

WaitAndStop SinkWorkerPool

func (*SinkWorkerPool) WorkerType

func (swp *SinkWorkerPool) WorkerType() string

WorkerType returns the type of worker

type SourceWorkerPool

type SourceWorkerPool struct {
	*ConcreteNodeWorker
	// contains filtered or unexported fields
}

SourceWorkerPool struct provides the worker pool infra for Source interface

func (*SourceWorkerPool) GetInputChannel

func (swp *SourceWorkerPool) GetInputChannel() (chan map[string]interface{}, error)

GetInputChannel returns the input channel of Source WorkerPool

func (*SourceWorkerPool) GetOutputChannel

func (swp *SourceWorkerPool) GetOutputChannel() (chan map[string]interface{}, error)

GetOutputChannel returns the output channel of Source WorkerPool

func (*SourceWorkerPool) SetInputChannel

func (swp *SourceWorkerPool) SetInputChannel(inChan chan map[string]interface{}) error

SetInputChannel updates the input channel of Source WorkerPool

func (*SourceWorkerPool) SetOutputChannel

func (swp *SourceWorkerPool) SetOutputChannel(outChan chan map[string]interface{}) error

SetOutputChannel updates the output channel of Source WorkerPool

func (*SourceWorkerPool) Start

func (swp *SourceWorkerPool) Start(ctx CnvContext) error

Start Source Worker Pool

func (*SourceWorkerPool) WaitAndStop

func (swp *SourceWorkerPool) WaitAndStop(ctx CnvContext) error

WaitAndStop SourceWorkerPool

func (*SourceWorkerPool) WorkerType

func (swp *SourceWorkerPool) WorkerType() string

WorkerType returns the type of worker

type StateUpdater added in v1.0.2

type StateUpdater interface {
	MarkPreparing() error
	MarkStarted() error
	MarkToKill() error
	MarkKilled() error
	MarkFinished() error
	MarkError() error
}

type WPool

type WPool struct {
	Name string
	Wg   sync.WaitGroup
	// contains filtered or unexported fields
}

WPool to run different nodes of comex graph

func (*WPool) Wait

func (wp *WPool) Wait()

Wait for worker to finish

type WorkerMode added in v1.0.2

type WorkerMode uint8

WorkerMode decides if worker would run in loop mode or single transaction mode

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL