pluginmanager

package
v1.0.33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2022 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BaseVersion = "0.1.0"

StatisticsConfigJson, AlarmConfigJson

View Source
var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second")
View Source
var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint file name, base dir(binary dir)")
View Source
var CheckPointManager checkPointManager
View Source
var DisabledLogtailConfig = make(map[string]*LogstoreConfig)
View Source
var DisabledLogtailConfigLock sync.Mutex

Configs that were disabled because of slow or hang config.

View Source
var ErrCheckPointNotInit = errors.New("checkpoint db not init")
View Source
var LastLogtailConfig map[string]*LogstoreConfig
View Source
var LogtailConfig map[string]*LogstoreConfig

Following variables are exported so that tests of main package can reference them.

View Source
var LogtailGlobalConfig = newGlobalConfig()

LogtailGlobalConfig is the singleton instance of GlobalConfig.

View Source
var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval")

Functions

func GetPluginPriority

func GetPluginPriority(pluginName string) int

func HoldOn

func HoldOn(exitFlag bool) error

HoldOn stops all config instance and checkpoint manager so that it is ready

to load new configs or quit.

For user-defined config, timeoutStop is used to avoid hanging.

func Init

func Init() (err error)

Init initializes plugin manager.

func LoadGlobalConfig

func LoadGlobalConfig(jsonStr string) int

LoadGlobalConfig updates LogtailGlobalConfig according to jsonStr (only once).

func LoadLogstoreConfig

func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, jsonStr string) error

func Resume

func Resume() error

Resume starts all configs.

Types

type AggregatorWrapper

type AggregatorWrapper struct {
	Aggregator    ilogtail.Aggregator
	Config        *LogstoreConfig
	LogGroupsChan chan *protocol.LogGroup
	Interval      time.Duration
	// contains filtered or unexported fields
}

AggregatorWrapper wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.

func (*AggregatorWrapper) Add

func (p *AggregatorWrapper) Add(loggroup *protocol.LogGroup) error

Add inserts @loggroup to LogGroupsChan if @loggroup is not empty. It is called by associated Aggregator. It returns errAggAdd when queue is full.

func (*AggregatorWrapper) AddWithWait

func (p *AggregatorWrapper) AddWithWait(loggroup *protocol.LogGroup, duration time.Duration) error

AddWithWait inserts @loggroup to LogGroupsChan, and it waits at most @duration. It works like Add but adds a timeout policy when log group queue is full. It returns errAggAdd when queue is full and timeout. NOTE: no body calls it now.

func (*AggregatorWrapper) Run

func (p *AggregatorWrapper) Run()

Run calls periodically Aggregator.Flush to get log groups from associated aggregator and pass them to LogstoreConfig through LogGroupsChan.

func (*AggregatorWrapper) Stop

func (p *AggregatorWrapper) Stop()

type AlwaysOnlineManager

type AlwaysOnlineManager struct {
	// contains filtered or unexported fields
}

AlwaysOnlineManager is used to manage the plugins that do not want to stop when config reloading

func GetAlwaysOnlineManager

func GetAlwaysOnlineManager() *AlwaysOnlineManager

GetAlwaysOnlineManager get a AlwaysOnlineManager instance

func (*AlwaysOnlineManager) AddCachedConfig

func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)

AddCachedConfig add cached config into manager, manager will stop and delete this config when timeout

func (*AlwaysOnlineManager) GetCachedConfig

func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)

GetCachedConfig get cached config from manager and delete this item, so manager will not close this config

func (*AlwaysOnlineManager) GetDeletedConfigs

func (aom *AlwaysOnlineManager) GetDeletedConfigs(
	existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig

GetDeletedConfigs returns cached configs not in @existConfigs.

type ContextImp

type ContextImp struct {
	StringMetrics  map[string]ilogtail.StringMetric
	CounterMetrics map[string]ilogtail.CounterMetric
	LatencyMetrics map[string]ilogtail.LatencyMetric
	// contains filtered or unexported fields
}

func (*ContextImp) AddPlugin

func (p *ContextImp) AddPlugin(name string)

func (*ContextImp) GetCheckPoint

func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)

func (*ContextImp) GetCheckPointObject

func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)

func (*ContextImp) GetConfigName

func (p *ContextImp) GetConfigName() string

func (*ContextImp) GetLogstore

func (p *ContextImp) GetLogstore() string

func (*ContextImp) GetProject

func (p *ContextImp) GetProject() string

func (*ContextImp) GetRuntimeContext

func (p *ContextImp) GetRuntimeContext() context.Context

func (*ContextImp) InitContext

func (p *ContextImp) InitContext(project, logstore, configName string)

func (*ContextImp) MetricSerializeToPB

func (p *ContextImp) MetricSerializeToPB(log *protocol.Log)

func (*ContextImp) RegisterCounterMetric

func (p *ContextImp) RegisterCounterMetric(metric ilogtail.CounterMetric)

func (*ContextImp) RegisterLatencyMetric

func (p *ContextImp) RegisterLatencyMetric(metric ilogtail.LatencyMetric)

func (*ContextImp) RegisterStringMetric

func (p *ContextImp) RegisterStringMetric(metric ilogtail.StringMetric)

func (*ContextImp) SaveCheckPoint

func (p *ContextImp) SaveCheckPoint(key string, value []byte) error

func (*ContextImp) SaveCheckPointObject

func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error

type FlusherWrapper

type FlusherWrapper struct {
	Flusher       ilogtail.Flusher
	Config        *LogstoreConfig
	LogGroupsChan chan *protocol.LogGroup
	Interval      time.Duration
}

type GlobalConfig

type GlobalConfig struct {
	InputIntervalMs          int
	AggregatIntervalMs       int
	FlushIntervalMs          int
	DefaultLogQueueSize      int
	DefaultLogGroupQueueSize int
	Tags                     map[string]string
	// Directory to store logtail data, such as checkpoint, etc.
	LogtailSysConfDir string
	// Network identification from logtail.
	HostIP       string
	Hostname     string
	AlwaysOnline bool
	DelayStopSec int
}

GlobalConfig represents global configurations of plugin system.

type InputAlarm

type InputAlarm struct {
	// contains filtered or unexported fields
}

func (*InputAlarm) Collect

func (r *InputAlarm) Collect(collector ilogtail.Collector) error

func (*InputAlarm) Description

func (r *InputAlarm) Description() string

func (*InputAlarm) Init

func (r *InputAlarm) Init(context ilogtail.Context) (int, error)

type InputStatistics

type InputStatistics struct {
	// contains filtered or unexported fields
}

func (*InputStatistics) Collect

func (r *InputStatistics) Collect(collector ilogtail.Collector) error

func (*InputStatistics) Description

func (r *InputStatistics) Description() string

func (*InputStatistics) Init

func (r *InputStatistics) Init(context ilogtail.Context) (int, error)

type LogstoreConfig

type LogstoreConfig struct {
	ProjectName       string
	LogstoreName      string
	ConfigName        string
	LogstoreKey       int64
	MetricPlugins     []*MetricWrapper
	ServicePlugins    []*ServiceWrapper
	ProcessorPlugins  []*ProcessorWrapper
	AggregatorPlugins []*AggregatorWrapper
	FlusherPlugins    []*FlusherWrapper

	// Each LogstoreConfig can have its independent GlobalConfig if the "global" field
	//   is offered in configuration, see build-in StatisticsConfig and AlarmConfig.
	GlobalConfig *GlobalConfig

	LogsChan      chan *protocol.Log
	LogGroupsChan chan *protocol.LogGroup

	Context           ilogtail.Context
	Statistics        LogstoreStatistics
	FlushOutLogGroups []*protocol.LogGroup
	FlushOutFlag      bool
	// contains filtered or unexported fields
}
var AlarmConfig *LogstoreConfig
var StatisticsConfig *LogstoreConfig

Two built-in logtail configs to report statistics and alarm (from system and other logtail configs).

func (*LogstoreConfig) ProcessRawLog

func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int

func (*LogstoreConfig) ProcessRawLogV2

func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int

ProcessRawLogV2 ... V1 -> V2: enable topic field, and use tags field to pass more tags. unsafe parameter: rawLog,packID and tags safe parameter: topic

func (*LogstoreConfig) Start

func (lc *LogstoreConfig) Start()

Start initializes plugin instances in config and starts them. Procedures:

  1. Start flusher goroutine and push FlushOutLogGroups inherited from last config instance to LogGroupsChan, so that they can be flushed to flushers.
  2. Start aggregators, allocate new goroutine for each one.
  3. Start processor goroutine to process logs from LogsChan.
  4. Start inputs (including metrics and services), just like aggregator, each input has its own goroutine.

func (*LogstoreConfig) Stop

func (lc *LogstoreConfig) Stop(exitFlag bool) error

Stop stops plugin instances and corresponding goroutines of config. @exitFlag passed from Logtail, indicates that if Logtail will quit after this. Procedures: 1. SetUrgent to all flushers to indicate them current state. 2. Stop all input plugins, stop generating logs. 3. Stop processor goroutine, pass all existing logs to aggregator. 4. Stop all aggregator plugins, make all logs to LogGroups. 5. Set stopping flag, stop flusher goroutine. 6. If Logtail is exiting and there are remaining data, try to flush once. 7. Stop flusher plugins.

func (*LogstoreConfig) TryFlushLoggroups

func (lc *LogstoreConfig) TryFlushLoggroups() bool

type LogstoreStatistics

type LogstoreStatistics struct {
	CollecLatencytMetric ilogtail.LatencyMetric
	RawLogMetric         ilogtail.CounterMetric
	SplitLogMetric       ilogtail.CounterMetric
	FlushLogMetric       ilogtail.CounterMetric
	FlushLogGroupMetric  ilogtail.CounterMetric
	FlushReadyMetric     ilogtail.CounterMetric
	FlushLatencyMetric   ilogtail.LatencyMetric
}

func (*LogstoreStatistics) Init

func (p *LogstoreStatistics) Init(context ilogtail.Context)

type MetricWrapper

type MetricWrapper struct {
	Input    ilogtail.MetricInput
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan      chan *protocol.Log
	LatencyMetric ilogtail.LatencyMetric
	// contains filtered or unexported fields
}

func (*MetricWrapper) AddData

func (p *MetricWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)

func (*MetricWrapper) AddDataArray

func (p *MetricWrapper) AddDataArray(tags map[string]string,
	columns []string,
	values []string,
	t ...time.Time)

func (*MetricWrapper) AddRawLog

func (p *MetricWrapper) AddRawLog(log *protocol.Log)

func (*MetricWrapper) Run

func (p *MetricWrapper) Run()

func (*MetricWrapper) Stop

func (p *MetricWrapper) Stop()

type ProcessorWrapper

type ProcessorWrapper struct {
	Processor ilogtail.Processor
	Config    *LogstoreConfig
	LogsChan  chan *protocol.Log
	Priority  int
}

type ProcessorWrapperArray

type ProcessorWrapperArray []*ProcessorWrapper

func (ProcessorWrapperArray) Len

func (c ProcessorWrapperArray) Len() int

func (ProcessorWrapperArray) Less

func (c ProcessorWrapperArray) Less(i, j int) bool

func (ProcessorWrapperArray) Swap

func (c ProcessorWrapperArray) Swap(i, j int)

type ServiceWrapper

type ServiceWrapper struct {
	Input    ilogtail.ServiceInput
	Config   *LogstoreConfig
	Tags     map[string]string
	Interval time.Duration

	LogsChan chan *protocol.Log
}

func (*ServiceWrapper) AddData

func (p *ServiceWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)

func (*ServiceWrapper) AddDataArray

func (p *ServiceWrapper) AddDataArray(tags map[string]string,
	columns []string,
	values []string,
	t ...time.Time)

func (*ServiceWrapper) AddRawLog

func (p *ServiceWrapper) AddRawLog(log *protocol.Log)

func (*ServiceWrapper) Run

func (p *ServiceWrapper) Run()

func (*ServiceWrapper) Stop

func (p *ServiceWrapper) Stop() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL