pipeline

package
v0.0.0-...-196de0d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 5 Imported by: 1

Documentation

Index

Constants

View Source
const (
	MetricInputType  = iota
	ServiceInputType = iota
	FilterType       = iota
	ProcessorType    = iota
	AggregatorType   = iota
)

logtail plugin type define

Variables

View Source
var Aggregators = map[string]AggregatorCreator{}
View Source
var Extensions = map[string]ExtensionCreator{}
View Source
var Flushers = map[string]FlusherCreator{}
View Source
var MetricInputs = map[string]MetricCreator{}
View Source
var Processors = map[string]ProcessorCreator{}
View Source
var ServiceInputs = map[string]ServiceCreator{}

Functions

func AddAggregatorCreator

func AddAggregatorCreator(name string, creator AggregatorCreator)

func AddExtensionCreator

func AddExtensionCreator(name string, creator ExtensionCreator)

func AddFlusherCreator

func AddFlusherCreator(name string, creator FlusherCreator)

func AddMetricCreator

func AddMetricCreator(name string, creator MetricCreator)

func AddProcessorCreator

func AddProcessorCreator(name string, creator ProcessorCreator)

func AddServiceCreator

func AddServiceCreator(name string, creator ServiceCreator)

Types

type Aggregator

type Aggregator interface {
	// Init called for init some system resources, like socket, mutex...
	// return flush interval(ms) and error flag, if interval is 0, use default interval
	Init(Context, LogGroupQueue) (int, error)

	// Description returns a one-sentence description on the Input.
	Description() string

	// Reset resets the aggregators caches and aggregates.
	Reset()
}

Aggregator is an interface for implementing an Aggregator plugin. the RunningAggregator wraps this interface and guarantees that

type AggregatorCreator

type AggregatorCreator func() Aggregator

type AggregatorV1

type AggregatorV1 interface {
	Aggregator
	// Add the metric to the aggregator.
	Add(log *protocol.Log, ctx map[string]interface{}) error

	// Flush pushes the current aggregates to the accumulator.
	Flush() []*protocol.LogGroup
}

AggregatorV1 Add, Flush, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.

type AggregatorV2

type AggregatorV2 interface {
	Aggregator
	// Add the metric to the aggregator.
	Record(*models.PipelineGroupEvents, PipelineContext) error
	// GetResult the current aggregates to the accumulator.
	GetResult(PipelineContext) error
}

AggregatorV2 Apply, Push, and Reset can not be called concurrently, so locking is not required when implementing an Aggregator plugin.

type AsyncControl

type AsyncControl struct {
	// contains filtered or unexported fields
}

AsyncControl is an asynchronous execution control that can be canceled.

func NewAsyncControl

func NewAsyncControl() *AsyncControl

func (*AsyncControl) CancelToken

func (p *AsyncControl) CancelToken() <-chan struct{}

CancelToken returns a readonly channel that can be subscribed to as a cancel token

func (*AsyncControl) Notify

func (p *AsyncControl) Notify()

func (*AsyncControl) Reset

func (p *AsyncControl) Reset()

Reset cancel channal

func (*AsyncControl) Run

func (p *AsyncControl) Run(task func(*AsyncControl))

Run function as a Task

func (*AsyncControl) WaitCancel

func (p *AsyncControl) WaitCancel()

Waiting for executing task to be canceled

type Collector

type Collector interface {
	AddData(tags map[string]string,
		fields map[string]string,
		t ...time.Time)

	AddDataArray(tags map[string]string,
		columns []string,
		values []string,
		t ...time.Time)

	AddRawLog(log *protocol.Log)

	AddDataWithContext(tags map[string]string,
		fields map[string]string,
		ctx map[string]interface{},
		t ...time.Time)

	AddDataArrayWithContext(tags map[string]string,
		columns []string,
		values []string,
		ctx map[string]interface{},
		t ...time.Time)

	AddRawLogWithContext(log *protocol.Log, ctx map[string]interface{})
}

Collector ...

type CommonContext

type CommonContext struct {
	Project    string
	Logstore   string
	ConfigName string
}

type Context

type Context interface {
	InitContext(project, logstore, configName string)

	GetConfigName() string
	GetProject() string
	GetLogstore() string
	GetRuntimeContext() context.Context
	GetExtension(name string, cfg any) (Extension, error)
	RegisterCounterMetric(metric CounterMetric)
	RegisterStringMetric(metric StringMetric)
	RegisterLatencyMetric(metric LatencyMetric)

	MetricSerializeToPB(log *protocol.Log)

	SaveCheckPoint(key string, value []byte) error
	GetCheckPoint(key string) (value []byte, exist bool)
	SaveCheckPointObject(key string, obj interface{}) error
	GetCheckPointObject(key string, obj interface{}) (exist bool)
}

Context for plugin

type CounterMetric

type CounterMetric interface {
	Name() string

	Add(v int64)

	// Clear same with set
	Clear(v int64)

	Get() int64

	Serialize(log *protocol.Log)
}

type Extension

type Extension interface {
	// Description returns a one-sentence description on the Extension
	Description() string

	// Init called for init some system resources, like socket, mutex...
	Init(Context) error

	// Stop stops the services and release resources
	Stop() error
}

Extension ...

type ExtensionCreator

type ExtensionCreator func() Extension

type Flusher

type Flusher interface {
	// Init called for init some system resources, like socket, mutex...
	Init(Context) error

	// Description returns a one-sentence description on the Input.
	Description() string

	// IsReady checks if flusher is ready to accept more data.
	// @projectName, @logstoreName, @logstoreKey: meta of the corresponding data.
	// Note: If SetUrgent is called, please make some adjustment so that IsReady
	//   can return true to accept more data in time and config instance can be
	//   stopped gracefully.
	IsReady(projectName string, logstoreName string, logstoreKey int64) bool

	// SetUrgent indicates the flusher that it will be destroyed soon.
	// @flag indicates if main program (Logtail mostly) will exit after calling this.
	//
	// Note: there might be more data to flush after SetUrgent is called, and if flag
	//   is true, these data will be passed to flusher through IsReady/Export before
	//   program exits.
	//
	// Recommendation: set some state flags in this method to guide the behavior
	//   of other methods.
	SetUrgent(flag bool)

	// Stop stops flusher and release resources.
	// It is time for flusher to do cleaning jobs, includes:
	// 1. Export cached but not flushed data. For flushers that contain some kinds of
	//   aggregation or buffering, it is important to flush cached out now, otherwise
	//   data will lost.
	// 2. Release opened resources: goroutines, file handles, connections, etc.
	// 3. Maybe more, it depends.
	// In a word, flusher should only have things that can be recycled by GC after this.
	Stop() error
}

Flusher ... Sample flusher implementation: see plugin_manager/flusher_sls.gox.

type FlusherCreator

type FlusherCreator func() Flusher

type FlusherV1

type FlusherV1 interface {
	Flusher
	// Flush flushes data to destination, such as SLS, console, file, etc.
	// It is expected to return no error at most time because IsReady will be called
	// before it to make sure there is space for next data.
	Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error
}

type FlusherV2

type FlusherV2 interface {
	Flusher
	// Export data to destination, such as gRPC, console, file, etc.
	// It is expected to return no error at most time because IsReady will be called
	// before it to make sure there is space for next data.
	Export([]*models.PipelineGroupEvents, PipelineContext) error
}

type LatencyMetric

type LatencyMetric interface {
	Name() string

	Begin()

	Clear()

	End()
	// nano second
	Get() int64

	Serialize(log *protocol.Log)
}

type LogGroupQueue

type LogGroupQueue interface {
	// no blocking
	Add(loggroup *protocol.LogGroup) error
	AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error
}

LogGroupQueue for aggregator, Non blocked if aggregator's buffer is full, aggregator can add LogGroup to this queue return error if LogGroupQueue is full

type LogGroupWithContext

type LogGroupWithContext struct {
	LogGroup *protocol.LogGroup
	Context  map[string]interface{}
}

type LogWithContext

type LogWithContext struct {
	Log     *protocol.Log
	Context map[string]interface{}
}

type MetricCreator

type MetricCreator func() MetricInput

type MetricInput

type MetricInput interface {
	// Init called for init some system resources, like socket, mutex...
	// return call interval(ms) and error flag, if interval is 0, use default interval
	Init(Context) (int, error)

	// Description returns a one-sentence description on the Input
	Description() string
}

MetricInput ...

type MetricInputV1

type MetricInputV1 interface {
	MetricInput
	// Collect takes in an accumulator and adds the metrics that the Input
	// gathers. This is called every "interval"
	Collect(Collector) error
}

type MetricInputV2

type MetricInputV2 interface {
	MetricInput
	// Collect takes in an accumulator and adds the metrics that the Input
	// gathers. This is called every "interval"
	Read(PipelineContext) error
}

type PipelineCollector

type PipelineCollector interface {

	// Collect single group and events data belonging to this group
	Collect(groupInfo *models.GroupInfo, eventList ...models.PipelineEvent)

	// CollectList collect GroupEvents list that have been grouped
	CollectList(groupEventsList ...*models.PipelineGroupEvents)

	// ToArray returns an array containing all of the PipelineGroupEvents in this collector.
	ToArray() []*models.PipelineGroupEvents

	// Observe returns a chan that can consume PipelineGroupEvents from this collector.
	Observe() chan *models.PipelineGroupEvents

	Close()
}

PipelineCollector collect data in the plugin and send the data to the next operator

type PipelineContext

type PipelineContext interface {
	Collector() PipelineCollector
}

PipelineContext which may include collector interface、checkpoint interface、config read and many more..

func NewGroupedPipelineConext

func NewGroupedPipelineConext() PipelineContext

func NewNoopPipelineConext

func NewNoopPipelineConext() PipelineContext

func NewObservePipelineConext

func NewObservePipelineConext(queueSize int) PipelineContext

type Processor

type Processor interface {
	// Init called for init some system resources, like socket, mutex...
	Init(Context) error

	// Description returns a one-sentence description on the Input
	Description() string
}

Processor also can be a filter

type ProcessorCreator

type ProcessorCreator func() Processor

type ProcessorV1

type ProcessorV1 interface {
	Processor
	// ProcessLogs the filter to the given metric
	ProcessLogs(logArray []*protocol.Log) []*protocol.Log
}

type ProcessorV2

type ProcessorV2 interface {
	Processor
	Process(in *models.PipelineGroupEvents, context PipelineContext)
}

type ServiceCreator

type ServiceCreator func() ServiceInput

type ServiceInput

type ServiceInput interface {
	// Init called for init some system resources, like socket, mutex...
	// return interval(ms) and error flag, if interval is 0, use default interval
	Init(Context) (int, error)

	// Description returns a one-sentence description on the Input
	Description() string

	// Stop stops the services and closes any necessary channels and connections
	Stop() error
}

ServiceInput ...

type ServiceInputV1

type ServiceInputV1 interface {
	ServiceInput
	// Start starts the ServiceInput's service, whatever that may be
	Start(Collector) error
}

type ServiceInputV2

type ServiceInputV2 interface {
	ServiceInput
	// StartService starts the ServiceInput's service, whatever that may be
	StartService(PipelineContext) error
}

type StringMetric

type StringMetric interface {
	Name() string

	Set(v string)

	Get() string

	Serialize(log *protocol.Log)
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL