Documentation ¶
Index ¶
- Variables
- func GetPluginPriority(pluginName string) int
- func HoldOn(exitFlag bool) error
- func Init() (err error)
- func LoadGlobalConfig(jsonStr string) int
- func LoadLogstoreConfig(project string, logstore string, configName string, logstoreKey int64, ...) error
- func Resume() error
- type AggregatorWrapper
- type AlwaysOnlineManager
- func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)
- func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)
- func (aom *AlwaysOnlineManager) GetDeletedConfigs(existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig
- type ContextImp
- func (p *ContextImp) AddPlugin(name string)
- func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)
- func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)
- func (p *ContextImp) GetConfigName() string
- func (p *ContextImp) GetLogstore() string
- func (p *ContextImp) GetProject() string
- func (p *ContextImp) GetRuntimeContext() context.Context
- func (p *ContextImp) InitContext(project, logstore, configName string)
- func (p *ContextImp) MetricSerializeToPB(log *protocol.Log)
- func (p *ContextImp) RegisterCounterMetric(metric ilogtail.CounterMetric)
- func (p *ContextImp) RegisterLatencyMetric(metric ilogtail.LatencyMetric)
- func (p *ContextImp) RegisterStringMetric(metric ilogtail.StringMetric)
- func (p *ContextImp) SaveCheckPoint(key string, value []byte) error
- func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error
- type FlusherWrapper
- type GlobalConfig
- type InputAlarm
- type InputStatistics
- type LogstoreConfig
- func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int
- func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int
- func (lc *LogstoreConfig) Start()
- func (lc *LogstoreConfig) Stop(exitFlag bool) error
- func (lc *LogstoreConfig) TryFlushLoggroups() bool
- type LogstoreStatistics
- type MetricWrapper
- func (p *MetricWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)
- func (p *MetricWrapper) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time)
- func (p *MetricWrapper) AddRawLog(log *protocol.Log)
- func (p *MetricWrapper) Run()
- func (p *MetricWrapper) Stop()
- type ProcessorWrapper
- type ProcessorWrapperArray
- type ServiceWrapper
- func (p *ServiceWrapper) AddData(tags map[string]string, fields map[string]string, t ...time.Time)
- func (p *ServiceWrapper) AddDataArray(tags map[string]string, columns []string, values []string, t ...time.Time)
- func (p *ServiceWrapper) AddRawLog(log *protocol.Log)
- func (p *ServiceWrapper) Run()
- func (p *ServiceWrapper) Stop() error
Constants ¶
This section is empty.
Variables ¶
var BaseVersion = "0.1.0"
StatisticsConfigJson, AlarmConfigJson
var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second")
var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint file name, base dir(binary dir)")
var CheckPointManager checkPointManager
var DisabledLogtailConfig = make(map[string]*LogstoreConfig)
var DisabledLogtailConfigLock sync.Mutex
Configs that were disabled because of slow or hang config.
var ErrCheckPointNotInit = errors.New("checkpoint db not init")
var LastLogtailConfig map[string]*LogstoreConfig
var LogtailConfig map[string]*LogstoreConfig
Following variables are exported so that tests of main package can reference them.
var LogtailGlobalConfig = newGlobalConfig()
LogtailGlobalConfig is the singleton instance of GlobalConfig.
var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval")
Functions ¶
func GetPluginPriority ¶
func HoldOn ¶
HoldOn stops all config instance and checkpoint manager so that it is ready
to load new configs or quit.
For user-defined config, timeoutStop is used to avoid hanging.
func LoadGlobalConfig ¶
LoadGlobalConfig updates LogtailGlobalConfig according to jsonStr (only once).
func LoadLogstoreConfig ¶
Types ¶
type AggregatorWrapper ¶
type AggregatorWrapper struct { Aggregator ilogtail.Aggregator Config *LogstoreConfig LogGroupsChan chan *protocol.LogGroup Interval time.Duration // contains filtered or unexported fields }
AggregatorWrapper wrappers Aggregator. It implements LogGroupQueue interface, and is passed to associated Aggregator. Aggregator uses Add function to pass log groups to wrapper, and then wrapper passes log groups to associated LogstoreConfig through channel LogGroupsChan. In fact, LogGroupsChan == (associated) LogstoreConfig.LogGroupsChan.
func (*AggregatorWrapper) Add ¶
func (p *AggregatorWrapper) Add(loggroup *protocol.LogGroup) error
Add inserts @loggroup to LogGroupsChan if @loggroup is not empty. It is called by associated Aggregator. It returns errAggAdd when queue is full.
func (*AggregatorWrapper) AddWithWait ¶
AddWithWait inserts @loggroup to LogGroupsChan, and it waits at most @duration. It works like Add but adds a timeout policy when log group queue is full. It returns errAggAdd when queue is full and timeout. NOTE: no body calls it now.
func (*AggregatorWrapper) Run ¶
func (p *AggregatorWrapper) Run()
Run calls periodically Aggregator.Flush to get log groups from associated aggregator and pass them to LogstoreConfig through LogGroupsChan.
func (*AggregatorWrapper) Stop ¶
func (p *AggregatorWrapper) Stop()
type AlwaysOnlineManager ¶
type AlwaysOnlineManager struct {
// contains filtered or unexported fields
}
AlwaysOnlineManager is used to manage the plugins that do not want to stop when config reloading
func GetAlwaysOnlineManager ¶
func GetAlwaysOnlineManager() *AlwaysOnlineManager
GetAlwaysOnlineManager get a AlwaysOnlineManager instance
func (*AlwaysOnlineManager) AddCachedConfig ¶
func (aom *AlwaysOnlineManager) AddCachedConfig(config *LogstoreConfig, timeout time.Duration)
AddCachedConfig add cached config into manager, manager will stop and delete this config when timeout
func (*AlwaysOnlineManager) GetCachedConfig ¶
func (aom *AlwaysOnlineManager) GetCachedConfig(configName string) (config *LogstoreConfig, ok bool)
GetCachedConfig get cached config from manager and delete this item, so manager will not close this config
func (*AlwaysOnlineManager) GetDeletedConfigs ¶
func (aom *AlwaysOnlineManager) GetDeletedConfigs( existConfigs map[string]*LogstoreConfig) map[string]*LogstoreConfig
GetDeletedConfigs returns cached configs not in @existConfigs.
type ContextImp ¶
type ContextImp struct { StringMetrics map[string]ilogtail.StringMetric CounterMetrics map[string]ilogtail.CounterMetric LatencyMetrics map[string]ilogtail.LatencyMetric // contains filtered or unexported fields }
func (*ContextImp) AddPlugin ¶
func (p *ContextImp) AddPlugin(name string)
func (*ContextImp) GetCheckPoint ¶
func (p *ContextImp) GetCheckPoint(key string) (value []byte, exist bool)
func (*ContextImp) GetCheckPointObject ¶
func (p *ContextImp) GetCheckPointObject(key string, obj interface{}) (exist bool)
func (*ContextImp) GetConfigName ¶
func (p *ContextImp) GetConfigName() string
func (*ContextImp) GetLogstore ¶
func (p *ContextImp) GetLogstore() string
func (*ContextImp) GetProject ¶
func (p *ContextImp) GetProject() string
func (*ContextImp) GetRuntimeContext ¶
func (p *ContextImp) GetRuntimeContext() context.Context
func (*ContextImp) InitContext ¶
func (p *ContextImp) InitContext(project, logstore, configName string)
func (*ContextImp) MetricSerializeToPB ¶
func (p *ContextImp) MetricSerializeToPB(log *protocol.Log)
func (*ContextImp) RegisterCounterMetric ¶
func (p *ContextImp) RegisterCounterMetric(metric ilogtail.CounterMetric)
func (*ContextImp) RegisterLatencyMetric ¶
func (p *ContextImp) RegisterLatencyMetric(metric ilogtail.LatencyMetric)
func (*ContextImp) RegisterStringMetric ¶
func (p *ContextImp) RegisterStringMetric(metric ilogtail.StringMetric)
func (*ContextImp) SaveCheckPoint ¶
func (p *ContextImp) SaveCheckPoint(key string, value []byte) error
func (*ContextImp) SaveCheckPointObject ¶
func (p *ContextImp) SaveCheckPointObject(key string, obj interface{}) error
type FlusherWrapper ¶
type GlobalConfig ¶
type GlobalConfig struct { InputIntervalMs int AggregatIntervalMs int FlushIntervalMs int DefaultLogQueueSize int DefaultLogGroupQueueSize int Tags map[string]string // Directory to store logtail data, such as checkpoint, etc. LogtailSysConfDir string // Network identification from logtail. HostIP string Hostname string AlwaysOnline bool DelayStopSec int }
GlobalConfig represents global configurations of plugin system.
type InputAlarm ¶
type InputAlarm struct {
// contains filtered or unexported fields
}
func (*InputAlarm) Description ¶
func (r *InputAlarm) Description() string
type InputStatistics ¶
type InputStatistics struct {
// contains filtered or unexported fields
}
func (*InputStatistics) Collect ¶
func (r *InputStatistics) Collect(collector ilogtail.Collector) error
func (*InputStatistics) Description ¶
func (r *InputStatistics) Description() string
type LogstoreConfig ¶
type LogstoreConfig struct { ProjectName string LogstoreName string ConfigName string LogstoreKey int64 MetricPlugins []*MetricWrapper ServicePlugins []*ServiceWrapper ProcessorPlugins []*ProcessorWrapper AggregatorPlugins []*AggregatorWrapper FlusherPlugins []*FlusherWrapper // Each LogstoreConfig can have its independent GlobalConfig if the "global" field // is offered in configuration, see build-in StatisticsConfig and AlarmConfig. GlobalConfig *GlobalConfig LogsChan chan *protocol.Log LogGroupsChan chan *protocol.LogGroup Context ilogtail.Context Statistics LogstoreStatistics FlushOutLogGroups []*protocol.LogGroup FlushOutFlag bool // contains filtered or unexported fields }
var AlarmConfig *LogstoreConfig
var StatisticsConfig *LogstoreConfig
Two built-in logtail configs to report statistics and alarm (from system and other logtail configs).
func (*LogstoreConfig) ProcessRawLog ¶
func (lc *LogstoreConfig) ProcessRawLog(rawLog []byte, packID string, topic string) int
func (*LogstoreConfig) ProcessRawLogV2 ¶
func (lc *LogstoreConfig) ProcessRawLogV2(rawLog []byte, packID string, topic string, tags []byte) int
ProcessRawLogV2 ... V1 -> V2: enable topic field, and use tags field to pass more tags. unsafe parameter: rawLog,packID and tags safe parameter: topic
func (*LogstoreConfig) Start ¶
func (lc *LogstoreConfig) Start()
Start initializes plugin instances in config and starts them. Procedures:
- Start flusher goroutine and push FlushOutLogGroups inherited from last config instance to LogGroupsChan, so that they can be flushed to flushers.
- Start aggregators, allocate new goroutine for each one.
- Start processor goroutine to process logs from LogsChan.
- Start inputs (including metrics and services), just like aggregator, each input has its own goroutine.
func (*LogstoreConfig) Stop ¶
func (lc *LogstoreConfig) Stop(exitFlag bool) error
Stop stops plugin instances and corresponding goroutines of config. @exitFlag passed from Logtail, indicates that if Logtail will quit after this. Procedures: 1. SetUrgent to all flushers to indicate them current state. 2. Stop all input plugins, stop generating logs. 3. Stop processor goroutine, pass all existing logs to aggregator. 4. Stop all aggregator plugins, make all logs to LogGroups. 5. Set stopping flag, stop flusher goroutine. 6. If Logtail is exiting and there are remaining data, try to flush once. 7. Stop flusher plugins.
func (*LogstoreConfig) TryFlushLoggroups ¶
func (lc *LogstoreConfig) TryFlushLoggroups() bool
type LogstoreStatistics ¶
type LogstoreStatistics struct { CollecLatencytMetric ilogtail.LatencyMetric RawLogMetric ilogtail.CounterMetric SplitLogMetric ilogtail.CounterMetric FlushLogMetric ilogtail.CounterMetric FlushLogGroupMetric ilogtail.CounterMetric FlushReadyMetric ilogtail.CounterMetric FlushLatencyMetric ilogtail.LatencyMetric }
func (*LogstoreStatistics) Init ¶
func (p *LogstoreStatistics) Init(context ilogtail.Context)
type MetricWrapper ¶
type MetricWrapper struct { Input ilogtail.MetricInput Config *LogstoreConfig Tags map[string]string Interval time.Duration LogsChan chan *protocol.Log LatencyMetric ilogtail.LatencyMetric // contains filtered or unexported fields }
func (*MetricWrapper) AddDataArray ¶
func (*MetricWrapper) AddRawLog ¶
func (p *MetricWrapper) AddRawLog(log *protocol.Log)
func (*MetricWrapper) Run ¶
func (p *MetricWrapper) Run()
func (*MetricWrapper) Stop ¶
func (p *MetricWrapper) Stop()
type ProcessorWrapper ¶
type ProcessorWrapperArray ¶
type ProcessorWrapperArray []*ProcessorWrapper
func (ProcessorWrapperArray) Len ¶
func (c ProcessorWrapperArray) Len() int
func (ProcessorWrapperArray) Less ¶
func (c ProcessorWrapperArray) Less(i, j int) bool
func (ProcessorWrapperArray) Swap ¶
func (c ProcessorWrapperArray) Swap(i, j int)
type ServiceWrapper ¶
type ServiceWrapper struct { Input ilogtail.ServiceInput Config *LogstoreConfig Tags map[string]string Interval time.Duration LogsChan chan *protocol.Log }
func (*ServiceWrapper) AddDataArray ¶
func (*ServiceWrapper) AddRawLog ¶
func (p *ServiceWrapper) AddRawLog(log *protocol.Log)
func (*ServiceWrapper) Run ¶
func (p *ServiceWrapper) Run()
func (*ServiceWrapper) Stop ¶
func (p *ServiceWrapper) Stop() error