orchestrator

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidProcessorParentType     = cerrors.New("invalid processor parent type")
	ErrPipelineHasProcessorsAttached  = cerrors.New("pipeline has processors attached")
	ErrPipelineHasConnectorsAttached  = cerrors.New("pipeline has connectors attached")
	ErrConnectorHasProcessorsAttached = cerrors.New("connector has processors attached")
	ErrImmutableProvisionedByConfig   = cerrors.New("entity was provisioned by a config file and cannot be mutated through the API, please change the corresponding config file instead")
)

Functions

This section is empty.

Types

type ConnectorOrchestrator

type ConnectorOrchestrator base

func (*ConnectorOrchestrator) Create

func (c *ConnectorOrchestrator) Create(
	ctx context.Context,
	t connector.Type,
	plugin string,
	pipelineID string,
	config connector.Config,
) (*connector.Instance, error)

func (*ConnectorOrchestrator) Delete

func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error

func (*ConnectorOrchestrator) Get

func (*ConnectorOrchestrator) Inspect added in v0.4.0

func (*ConnectorOrchestrator) List

func (*ConnectorOrchestrator) Update

func (*ConnectorOrchestrator) Validate added in v0.2.1

func (c *ConnectorOrchestrator) Validate(
	ctx context.Context,
	t connector.Type,
	plugin string,
	config connector.Config,
) error

type ConnectorPluginOrchestrator added in v0.9.0

type ConnectorPluginOrchestrator base

func (*ConnectorPluginOrchestrator) List added in v0.9.0

type ConnectorPluginService added in v0.9.0

type ConnectorPluginService interface {
	List(ctx context.Context) (map[string]connectorPlugin.Specification, error)
	NewDispenser(logger log.CtxLogger, name string) (connectorPlugin.Dispenser, error)
	ValidateSourceConfig(ctx context.Context, name string, settings map[string]string) error
	ValidateDestinationConfig(ctx context.Context, name string, settings map[string]string) error
}

type ConnectorService

type ConnectorService interface {
	List(ctx context.Context) map[string]*connector.Instance
	Get(ctx context.Context, id string) (*connector.Instance, error)
	Create(ctx context.Context, id string, t connector.Type, plugin string, pipelineID string, c connector.Config, p connector.ProvisionType) (*connector.Instance, error)
	Delete(ctx context.Context, id string, dispenserFetcher connector.PluginDispenserFetcher) error
	Update(ctx context.Context, id string, c connector.Config) (*connector.Instance, error)

	AddProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
	RemoveProcessor(ctx context.Context, connectorID string, processorID string) (*connector.Instance, error)
}

type Orchestrator

type Orchestrator struct {
	Processors       *ProcessorOrchestrator
	Pipelines        *PipelineOrchestrator
	Connectors       *ConnectorOrchestrator
	ConnectorPlugins *ConnectorPluginOrchestrator
	ProcessorPlugins *ProcessorPluginOrchestrator
}

func NewOrchestrator

func NewOrchestrator(
	db database.DB,
	logger log.CtxLogger,
	pipelines PipelineService,
	connectors ConnectorService,
	processors ProcessorService,
	connectorPlugins ConnectorPluginService,
	processorPlugins ProcessorPluginService,
) *Orchestrator

type PipelineOrchestrator

type PipelineOrchestrator base

func (*PipelineOrchestrator) Create

func (*PipelineOrchestrator) Delete

func (po *PipelineOrchestrator) Delete(ctx context.Context, id string) error

func (*PipelineOrchestrator) Get

func (*PipelineOrchestrator) List

func (*PipelineOrchestrator) Start

func (po *PipelineOrchestrator) Start(ctx context.Context, id string) error

func (*PipelineOrchestrator) Stop

func (po *PipelineOrchestrator) Stop(ctx context.Context, id string, force bool) error

func (*PipelineOrchestrator) Update

func (*PipelineOrchestrator) UpdateDLQ added in v0.5.0

func (po *PipelineOrchestrator) UpdateDLQ(ctx context.Context, id string, dlq pipeline.DLQ) (*pipeline.Instance, error)

type PipelineService

type PipelineService interface {
	Start(ctx context.Context, connFetcher pipeline.ConnectorFetcher, procService pipeline.ProcessorService, pluginFetcher pipeline.PluginDispenserFetcher, pipelineID string) error
	// Stop initiates a stop of the given pipeline. The method does not wait for
	// the pipeline (and its nodes) to actually stop.
	// When force is false the pipeline will try to stop gracefully and drain
	// any in-flight messages that have not yet reached the destination. When
	// force is true the pipeline will stop without draining in-flight messages.
	// It is allowed to execute a force stop even after a graceful stop was
	// requested.
	Stop(ctx context.Context, pipelineID string, force bool) error

	List(ctx context.Context) map[string]*pipeline.Instance
	Get(ctx context.Context, id string) (*pipeline.Instance, error)
	Create(ctx context.Context, id string, cfg pipeline.Config, p pipeline.ProvisionType) (*pipeline.Instance, error)
	Update(ctx context.Context, pipelineID string, cfg pipeline.Config) (*pipeline.Instance, error)
	Delete(ctx context.Context, pipelineID string) error
	UpdateDLQ(ctx context.Context, id string, dlq pipeline.DLQ) (*pipeline.Instance, error)

	AddConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	RemoveConnector(ctx context.Context, pipelineID string, connectorID string) (*pipeline.Instance, error)
	AddProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
	RemoveProcessor(ctx context.Context, pipelineID string, processorID string) (*pipeline.Instance, error)
}

type ProcessorOrchestrator

type ProcessorOrchestrator base

func (*ProcessorOrchestrator) Create

func (p *ProcessorOrchestrator) Create(
	ctx context.Context,
	plugin string,
	parent processor.Parent,
	cfg processor.Config,
	cond string,
) (*processor.Instance, error)

func (*ProcessorOrchestrator) Delete

func (p *ProcessorOrchestrator) Delete(ctx context.Context, id string) error

func (*ProcessorOrchestrator) Get

func (*ProcessorOrchestrator) InspectIn added in v0.5.0

func (p *ProcessorOrchestrator) InspectIn(
	ctx context.Context,
	id string,
) (*inspector.Session, error)

func (*ProcessorOrchestrator) InspectOut added in v0.5.0

func (p *ProcessorOrchestrator) InspectOut(
	ctx context.Context,
	id string,
) (*inspector.Session, error)

func (*ProcessorOrchestrator) List

func (*ProcessorOrchestrator) Update

type ProcessorPluginOrchestrator added in v0.9.0

type ProcessorPluginOrchestrator base

func (*ProcessorPluginOrchestrator) List added in v0.9.0

func (*ProcessorPluginOrchestrator) RegisterStandalonePlugin added in v0.9.1

func (ps *ProcessorPluginOrchestrator) RegisterStandalonePlugin(ctx context.Context, path string) (string, error)

type ProcessorPluginService added in v0.9.0

type ProcessorPluginService interface {
	List(ctx context.Context) (map[string]processorSdk.Specification, error)
	NewProcessor(ctx context.Context, pluginName string, id string) (processorSdk.Processor, error)
	RegisterStandalonePlugin(ctx context.Context, path string) (string, error)
}

type ProcessorService

type ProcessorService interface {
	List(ctx context.Context) map[string]*processor.Instance
	Get(ctx context.Context, id string) (*processor.Instance, error)
	Create(ctx context.Context, id string, plugin string, parent processor.Parent, cfg processor.Config, p processor.ProvisionType, condition string) (*processor.Instance, error)
	MakeRunnableProcessor(ctx context.Context, i *processor.Instance) (*processor.RunnableProcessor, error)
	Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error)
	Delete(ctx context.Context, id string) error
}

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL