modules

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2024 License: MIT Imports: 8 Imported by: 21

README

Modules

Flow-based programming is the base of SDK. It is a set of asynchronous modules which can be combinated in workflow. Each module implement a function that has N inputs and M outputs and executes asynchronously (N, M >= 0). In our implementation inputs are untyped. So each module has to check type of received message.

Interface

Module has to implement the interface Module:

type Module interface {
   io.Closer

   Name() string
   Start(ctx context.Context)
   Input(name string) (*Input, error)
   Output(name string) (*Output, error)
   AttachTo(outputName string, input *Input) error
}

Interface contains following methods:

  • Start - starts asynchronous waiting of messages in inputs and initialize module state.
  • Close - gracefully stops all module activities (inherited from io.Closer interface).
  • Name - returns name of module which will be used in workflow construction.
  • Input - returns input by its name.
  • Output - returns output by its name.
  • AttachTo - connects output with name to passed input of another module.

Inputs and outputs

All communication between modules is implemented via inputs/outputs. Input is the structure contains channel with any as data. It also has name which will be its identity in module's scope.

type Input struct {
	data chan any
	name string
}

It has following methods:

  • Close - closes channel of input
  • Push - sends message to channel
  • Listen - waits new message
  • Name - returns input name

Output is the set of inputs which connected to it. When module send message to output it iterates over all connected inputs and pushes message to them. Output also has name which identifies it.

type Output struct {
	connectedInputs []*Input
	name            string

	mx sync.RWMutex
}

It has following methods:

  • ConnectedInputs - returns all connected inputs
  • Push - pushes message to all connected inputs
  • Attach - adds input to connected inputs array
  • Name - returns output name

SDK has helper function Connect:

func Connect(outputModule, inputModule Module, outputName, inputName string) error 

The function receives outputModule and inputModule: modules which will be connected. Also it receives input and output names in that modules which will be connected.

Workflow

Modules can be united in workflow. Workflow is set of module which connected in certain seqeunce. To create Workflow you can call function NewWorkflow:

func NewWorkflow(modules ...Module) *Workflow

Workflow has following functions:

  • Add - adds module with name returning from its Name method
  • AddWithName - adds module with name passed to it
  • Get - returns module by name which module was created with
  • Connect - connects modules with certain names by names of its input and output
  • Start - starts all modules in workflow

Implemented modules

SDK has some modules which can be used during workflow creation.

gRPC

gRPC module where realized default client and server. Detailed docs can be found here.

Cron

Cron module implements cron scheduler. Detailed docs can be found here.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownInput  = errors.New("unknown input")
	ErrUnknownOutput = errors.New("unknown output")
)

errors

View Source
var (
	ErrUnknownModule = errors.New("unknown module")
)

errors

Functions

func Connect

func Connect(outputModule, inputModule Module, outputName, inputName string) error

Connect -

Types

type BaseModule added in v0.0.2

type BaseModule struct {
	Log zerolog.Logger
	G   workerpool.Group
	// contains filtered or unexported fields
}

func New added in v0.0.2

func New(name string) BaseModule

func (*BaseModule) AttachTo added in v0.0.2

func (m *BaseModule) AttachTo(outputModule Module, outputName, inputName string) error

func (*BaseModule) Close added in v0.0.2

func (*BaseModule) Close() error

func (*BaseModule) CreateInput added in v0.0.2

func (m *BaseModule) CreateInput(name string)

func (*BaseModule) CreateInputWithCapacity added in v0.0.5

func (m *BaseModule) CreateInputWithCapacity(name string, cap int)

func (*BaseModule) CreateOutput added in v0.0.2

func (m *BaseModule) CreateOutput(name string)

func (*BaseModule) Input added in v0.0.2

func (m *BaseModule) Input(name string) (*Input, error)

func (*BaseModule) MustInput added in v0.0.3

func (m *BaseModule) MustInput(name string) *Input

func (*BaseModule) MustOutput added in v0.0.3

func (m *BaseModule) MustOutput(name string) *Output

func (*BaseModule) Name added in v0.0.2

func (m *BaseModule) Name() string

func (*BaseModule) Output added in v0.0.2

func (m *BaseModule) Output(name string) (*Output, error)

func (*BaseModule) Start added in v0.0.2

func (*BaseModule) Start(_ context.Context)

type Input

type Input struct {
	// contains filtered or unexported fields
}

Input -

func NewInput

func NewInput(name string) *Input

NewInput -

func NewInputWithCapacity added in v0.0.5

func NewInputWithCapacity(name string, cap int) *Input

NewInputWithCapacity -

func (*Input) Close

func (input *Input) Close() error

Close -

func (*Input) Listen

func (input *Input) Listen() <-chan any

Listen -

func (*Input) Name

func (input *Input) Name() string

Name -

func (*Input) Push

func (input *Input) Push(msg any)

Push -

type Module

type Module interface {
	io.Closer

	Name() string

	Start(ctx context.Context)

	Input(name string) (*Input, error)
	MustInput(name string) *Input
	Output(name string) (*Output, error)
	MustOutput(name string) *Output
	AttachTo(output Module, outputName, inputName string) error
}

Module is the interface which modules have to implement.

type Output

type Output struct {
	// contains filtered or unexported fields
}

Output -

func NewOutput

func NewOutput(name string) *Output

NewOutput -

func (*Output) Attach

func (output *Output) Attach(input *Input)

Attach -

func (*Output) ConnectedInputs

func (output *Output) ConnectedInputs() []*Input

ConnectedInputs -

func (*Output) Name

func (output *Output) Name() string

Name -

func (*Output) Push

func (output *Output) Push(msg any)

Push -

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow -

func NewWorkflow

func NewWorkflow(modules ...Module) *Workflow

NewWorkflow -

func (*Workflow) Add

func (wf *Workflow) Add(module Module) error

Add - adds module to workflow

func (*Workflow) AddWithName

func (wf *Workflow) AddWithName(module Module, name string) error

AddWithName - adds module to workflow with custom name

func (*Workflow) Connect

func (wf *Workflow) Connect(srcModule, srcOutput, destModule, destInput string) error

Connect - connect destination nodule input to source module output

func (*Workflow) Get

func (wf *Workflow) Get(name string) (Module, error)

Get - gets module from the workflow by name

func (*Workflow) Start

func (wf *Workflow) Start(ctx context.Context)

Start - starts workflow

Directories

Path Synopsis
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL