pipeline

package
v0.3.0-nightly.20220806 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2022 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTimeout             = cerrors.New("operation timed out")
	ErrGracefulShutdown    = cerrors.New("graceful shutdown")
	ErrPipelineRunning     = cerrors.New("pipeline is running")
	ErrPipelineNotRunning  = cerrors.New("pipeline not running")
	ErrInstanceNotFound    = cerrors.New("pipeline instance not found")
	ErrNameMissing         = cerrors.New("must provide a pipeline name")
	ErrNameAlreadyExists   = cerrors.New("pipeline name already exists")
	ErrConnectorIDNotFound = cerrors.New("connector ID not found")
	ErrProcessorIDNotFound = cerrors.New("processor ID not found")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	Name        string
	Description string
}

Config holds configuration data for building a pipeline.

type ConnectorFetcher

type ConnectorFetcher interface {
	Get(ctx context.Context, id string) (connector.Connector, error)
}

ConnectorFetcher can fetch a connector instance.

type Instance

type Instance struct {
	ID        string
	Config    Config
	Status    Status
	Error     string
	CreatedAt time.Time
	UpdatedAt time.Time

	ConnectorIDs []string
	ProcessorIDs []string
	// contains filtered or unexported fields
}

Instance manages a collection of Connectors, which can be either Destination or Source. The pipeline sets up its publishers and subscribers based on whether the Connector in question is a Destination or a Source.

func (*Instance) Wait

func (p *Instance) Wait() error

type ProcessorFetcher

type ProcessorFetcher interface {
	Get(ctx context.Context, id string) (*processor.Instance, error)
}

ProcessorFetcher can fetch a processor instance.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages pipelines.

func NewService

func NewService(logger log.CtxLogger, db database.DB) *Service

NewService initializes and returns a pipeline Service.

func (*Service) AddConnector

func (s *Service) AddConnector(ctx context.Context, pl *Instance, connectorID string) (*Instance, error)

AddConnector adds a connector to a pipeline.

func (*Service) AddProcessor

func (s *Service) AddProcessor(ctx context.Context, pl *Instance, processorID string) (*Instance, error)

AddProcessor adds a processor to a pipeline.

func (*Service) Create

func (s *Service) Create(ctx context.Context, id string, cfg Config) (*Instance, error)

Create will create a new pipeline instance with the given config and return it if it was successfully saved to the database.

func (*Service) Delete

func (s *Service) Delete(ctx context.Context, pl *Instance) error

Delete removes a pipeline instance from the Service.

func (*Service) Get

func (s *Service) Get(ctx context.Context, id string) (*Instance, error)

Get will return a single pipeline instance or an error.

func (*Service) Init

func (s *Service) Init(
	ctx context.Context,
	connFetcher ConnectorFetcher,
	procFetcher ProcessorFetcher,
) error

Init fetches instances from the store and starts pipelines that are supposed to be running. Connectors and processors should be initialized before calling this function.

func (*Service) List

func (s *Service) List(ctx context.Context) map[string]*Instance

List returns all pipeline instances in the Service.

func (*Service) RemoveConnector

func (s *Service) RemoveConnector(ctx context.Context, pl *Instance, connectorID string) (*Instance, error)

RemoveConnector removes a connector from a pipeline.

func (*Service) RemoveProcessor

func (s *Service) RemoveProcessor(ctx context.Context, pl *Instance, processorID string) (*Instance, error)

RemoveProcessor removes a processor from a pipeline.

func (*Service) Start

func (s *Service) Start(
	ctx context.Context,
	connFetcher ConnectorFetcher,
	procFetcher ProcessorFetcher,
	pl *Instance,
) error

Start builds and starts a pipeline instance.

func (*Service) Stop

func (s *Service) Stop(ctx context.Context, pl *Instance) error

Stop will attempt to gracefully stop a given pipeline by calling each node's Stop function.

func (*Service) StopAll

func (s *Service) StopAll(ctx context.Context, reason error)

StopAll will ask all the pipelines to stop gracefully (i.e. that existing messages get processed but not new messages get produced).

func (*Service) Update

func (s *Service) Update(ctx context.Context, pl *Instance, cfg Config) (*Instance, error)

Update will update a pipeline instance config.

func (*Service) Wait

func (s *Service) Wait(timeout time.Duration) error

Wait blocks until all pipelines are stopped or until the timeout is reached. Returns:

(1) nil if all the pipelines are gracefully stopped,

(2) an error, if the pipelines could not have been gracefully stopped,

(3) ErrTimeout if the pipelines were not stopped within the given timeout.

type Status

type Status int

Status defines the running status of a pipeline.

const (
	StatusRunning Status = iota + 1
	StatusSystemStopped
	StatusUserStopped
	StatusDegraded
)

func (Status) String

func (i Status) String() string

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store handles the persistence and fetching of pipeline instances.

func NewStore

func NewStore(db database.DB) *Store

func (*Store) Delete

func (s *Store) Delete(ctx context.Context, id string) error

Delete deletes instance under the key id and returns nil on success, error otherwise.

func (*Store) Get

func (s *Store) Get(ctx context.Context, id string) (*Instance, error)

Get will return the pipeline instance for a given id or an error.

func (*Store) GetAll

func (s *Store) GetAll(ctx context.Context) (map[string]*Instance, error)

GetAll returns all instances stored in the database.

func (*Store) Set

func (s *Store) Set(ctx context.Context, id string, instance *Instance) error

Set stores instance under the key id and returns nil on success, error otherwise.

Directories

Path Synopsis
Package stream defines a message and nodes that can be composed into a data pipeline.
Package stream defines a message and nodes that can be composed into a data pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL