engine

package
v0.0.0-...-d841f61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2021 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

engine contain implementation of canopsis engine.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsConnectionError

func IsConnectionError(err error) bool

IsConnectionError uses to check if stop engine or continue work.

Types

type Consumer

type Consumer interface {
	Consume(context.Context) error
}

Consumer interface is used to implement AMQP consumer of engine. If Consume returns error engine will be stopped.

func NewDefaultConsumer

func NewDefaultConsumer(
	name, queue string,
	consumePrefetchCount, consumePrefetchSize int,
	purgeQueue bool,
	nextExchange, nextQueue, fifoExchange, fifoQueue string,
	connection libamqp.Connection,
	processor MessageProcessor,
	logger zerolog.Logger,
) Consumer

NewDefaultConsumer creates consumer.

func NewRPCServer

func NewRPCServer(
	name, queue string,
	consumePrefetchCount, consumePrefetchSize int,
	connection libamqp.Connection,
	processor MessageProcessor,
	logger zerolog.Logger,
) Consumer

NewRPCServer creates consumer.

type Edge

type Edge struct {
	From string `json:"from"`
	To   string `json:"to"`
}

type Engine

type Engine interface {
	// AddConsumer adds AMQP consumer to engine.
	AddConsumer(Consumer)
	// AddPeriodicalWorker adds periodical worker to engine.
	AddPeriodicalWorker(PeriodicalWorker)
	// Run starts goroutines for all consumers and periodical workers.
	// Engine stops if one of consumer or periodical worker return error.
	Run(context.Context) error
}

Engine interface is used to implement canopsis engine.

func New

func New(
	init func(ctx context.Context) error,
	deferFunc func(),
	logger zerolog.Logger,
) Engine

type MessageProcessor

type MessageProcessor interface {
	Process(ctx context.Context, d amqp.Delivery) (newMessage []byte, err error)
}

MessageProcessor interface is used to implement AMQP message processor of consumer. If Process returns error engine will be stopped.

type PeriodicalWorker

type PeriodicalWorker interface {
	GetInterval() time.Duration
	Work(ctx context.Context) error
}

PeriodicalWork interface is used to implement engine periodical worker. If Work returns error engine will be stopped.

func NewLoadConfigPeriodicalWorker

func NewLoadConfigPeriodicalWorker(
	periodicalInterval time.Duration,
	adapter config.Adapter,
	updater config.Updater,
	logger zerolog.Logger,
) PeriodicalWorker

func NewRunInfoPeriodicalWorker

func NewRunInfoPeriodicalWorker(
	periodicalInterval time.Duration,
	manager RunInfoManager,
	info RunInfo,
	logger zerolog.Logger,
) PeriodicalWorker

type RPCClient

type RPCClient interface {
	// Consumer receives RPC responses from AMQP queue.
	Consumer
	// Call receives RPC request and publishes it to AMQP queue.
	Call(m RPCMessage) error
}

RPCClient interface is used to implement AMQP RPC client.

func NewRPCClient

func NewRPCClient(
	name, serverQueueName, clientQueueName string,
	consumePrefetchCount, consumePrefetchSize int,
	processor RPCMessageProcessor,
	amqpChannel libamqp.Channel,
	logger zerolog.Logger,
) RPCClient

NewRPCClient creates new AMQP RPC client.

type RPCMessage

type RPCMessage struct {
	CorrelationID string
	Body          []byte
}

RPCMessage is AMQP RPC request or response.

type RPCMessageProcessor

type RPCMessageProcessor interface {
	Process(RPCMessage) error
}

RPCMessageProcessor interface is used to implement AMQP RPC response processor of consumer. If Process returns error engine will be stopped.

type RunInfo

type RunInfo struct {
	Name            string `json:"name"`
	ConsumeQueue    string `json:"input_queue"`
	PublishQueue    string `json:"output_queue"`
	PublishExchange string `json:"output_exchange,omitempty"`
}

RunInfo is engine run information to detect engines order.

type RunInfoGraph

type RunInfoGraph struct {
	Nodes []RunInfo `json:"nodes"`
	Edges []Edge    `json:"edges"`
}

type RunInfoManager

type RunInfoManager interface {
	Save(ctx context.Context, info RunInfo, expiration time.Duration) error
	Get(ctx context.Context, engineName string) (*RunInfo, error)
	GetAll(ctx context.Context) ([]RunInfo, error)
	GetGraph(ctx context.Context) (*RunInfoGraph, error)
	ClearAll(ctx context.Context) error
}

RunInfoManager interface is used to implement engine run info storage.

func NewRunInfoManager

func NewRunInfoManager(client redis.Cmdable, key ...string) RunInfoManager

NewRunInfoManager creates new run info manager.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL