Versions in this module Expand all Collapse all v0 v0.0.1 Jul 2, 2024 Changes in this version + const FormatAuto + const FormatGolangLayout + const FormatUnix + const FormatUnixMilli + const ReportTaskInfoInterval + const TimeParseError + const TypeAuto + const TypeElect + const TypeProcessTime + var ErrIndexOutOfBound = errors.New("index ouf of bound") + var LogParseNotMatched = errors.New("LogParseNotMatched") + type Consumer struct + BeforeParseWhere XWhere + GroupBy XGroupBy + LogParser LogParser + Select XSelect + TimeParser TimeParser + Where XWhere + Window *XWindow + func (c *Consumer) AddBatchDetailDatus(expectedTs int64, datum []*model.DetailData) + func (c *Consumer) Consume(resp *logstream.ReadResponse, iw *inputWrapper, err error) + func (c *Consumer) LoadState(state *consumerStateObj) error + func (c *Consumer) SaveState() (*consumerStateObj, error) + func (c *Consumer) SetOutput(output output.Output) + func (c *Consumer) SetStorage(s *storage.Storage) + func (c *Consumer) Start() + func (c *Consumer) Stop() + func (c *Consumer) Update(o *Consumer) + type ConsumerStat struct + AggWhereError int32 + Broken bool + Bytes int64 + Emit int32 + EmitError int32 + EmitSuccess int32 + FilterBeforeParseWhere int32 + FilterDelay int32 + FilterGroup int32 + FilterGroupMaxKeys int32 + FilterIgnore int32 + FilterLogParseError int32 + FilterMultiline int32 + FilterTimeParseError int32 + FilterWhere int32 + Groups int32 + IoEmpty int32 + IoError int32 + IoTotal int32 + Lines int32 + Miss bool + Processed int32 + SelectError int32 + ZeroBytes int + type DataAccumulator interface + AddBatchDetailDatus func([]*model.DetailData) + type DataNode interface + GetCount func() int32 + GetNumber func() float64 + GetString func() string + type DataNodeImpl struct + Count int32 + String string + Value float64 + func (d *DataNodeImpl) GetCount() int32 + func (d *DataNodeImpl) GetNumber() float64 + func (d *DataNodeImpl) GetString() string + type DelayCalculator struct + func NewDelayCalculator() *DelayCalculator + type DryRunExecutor struct + RootEvent *event.Event + func NewDryRunExecutor(request *DryRunRequest) (*DryRunExecutor, error) + func (e *DryRunExecutor) Run() *DryRunResponse + type DryRunRequest struct + Input *Input + Task *collecttask.CollectTask + type DryRunResponse struct + Event *event.Event + GroupResult []*GroupResult + type EmitContext struct + type GroupResult struct + GroupBy map[string]interface{} + GroupLines []string + Paas bool + SelectedValues map[string]interface{} + type Input struct + Plain *Input_Plain + Read *Input_Read + Type string + type Input_Plain struct + Lines []string + Timezone string + type Input_Read struct + MaxLines int + type LogConsumer interface + type LogConsumerManager struct + type LogContext struct + func (c *LogContext) GetColumnByIndex(index int) (string, error) + func (c *LogContext) GetColumnByName(name string) (interface{}, error) + func (c *LogContext) GetLine() string + type LogGroup struct + Line string + Lines []string + func (l *LogGroup) Add(line string) + func (l *LogGroup) FirstLine() string + func (l *LogGroup) SetOneLine(line string) + type LogParser interface + Parse func(ctx *LogContext) error + type LogPathDetector struct + func NewLogDetector(key string, from *collectconfig.From, target *collecttask.CollectTarget) *LogPathDetector + func (ld *LogPathDetector) Detect() []filematch.FatPath + type LogPipeline struct + func NewPipeline(st *api.SubTask, s *storage.Storage, lsm *logstream.Manager) (*LogPipeline, error) + func (p *LogPipeline) Key() string + func (p *LogPipeline) LoadState(store transfer.StateStore) error + func (p *LogPipeline) SetupConsumer(st *api.SubTask) error + func (p *LogPipeline) Start() error + func (p *LogPipeline) Stop() + func (p *LogPipeline) StopAndSaveState(store transfer.StateStore) error + func (p *LogPipeline) Update(f func(api.Pipeline)) + func (p *LogPipeline) View(f func(api.Pipeline)) + type LogSource interface + Start func() + Stop func() + type LogTaskPipeline struct + ConsumerManager LogConsumerManager + Source LogSource + type ParsedConf struct + ParsedPatterns []*ParsedPatternConf + type ParsedPatternConf struct + type PeriodStatus struct + EmitError int + EmitSuccess bool + Stat ConsumerStat + Watermark int64 + type PullLogSource interface + type RunInLock func(func()) + type SubConsumer interface + Emit func(expectedTs int64) bool + MaybeFlush func() + ProcessGroup func(iw *inputWrapper, ctx *LogContext, maxTs *int64) + Update func(f func()) + type TimeParser interface + Parse func(*LogContext) (int64, error) + type XElect interface + Elect func(ctx *LogContext) (interface{}, error) + ElectNumber func(ctx *LogContext) (float64, error) + ElectString func(ctx *LogContext) (string, error) + Init func() + type XGroupBy interface + Execute func(ctx *LogContext) ([]string, error) + GroupNames func() []string + MaxKeySize func() int + type XSelect interface + Select func(ctx *LogContext) ([]DataNode, error) + type XTransformFilter interface + Filter func(ctx *LogContext) (interface{}, error) + Init func() error + type XWhere interface + Test func(ctx *LogContext) (bool, error) + func MustParseWhere(w *collectconfig.Where) XWhere + type XWindow struct + Interval time.Duration