file

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2023 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AckStart = AckTaskType("start")
	AckStop  = AckTaskType("stop")
)
View Source
const (
	JobActive = JobStatus(1)
	JobDelete = JobStatus(2)
	JobStop   = JobStatus(3)
)
View Source
const (
	IsolationPipeline = IsolationLevel("pipeline")
	IsolationSource   = IsolationLevel("source")
	IsolationShare    = IsolationLevel("share")
)
View Source
const (
	MultiStart = MultiTaskType("start")
	MultiStop  = MultiTaskType("stop")
)
View Source
const (
	START = WatchTaskType("start")
	STOP  = WatchTaskType("stop")
)
View Source
const (
	CREATE = Operation(0)
	WRITE  = Operation(1)
	REMOVE = Operation(2)
	RENAME = Operation(3)
)
View Source
const (
	HandlerRegistryPath = "/api/v1/source/file/registry"
)
View Source
const (
	SystemStateKey = event.SystemKeyPrefix + "State"
)
View Source
const Type = "file"

Variables

View Source
var NilOfTime, _ = time.ParseInLocation("2006-01-02 15:04:05", "2008-08-08 08:08:08", time.Local)

Functions

func ExportWatchMetric added in v1.3.0

func ExportWatchMetric() map[string]eventbus.WatchMetricData

ExportWatchMetric export all pipeline/source files info

func JobUid

func JobUid(fileInfo os.FileInfo) string

func NewAckWith

func NewAckWith(state *persistence.State) *ack

func RegisterProcessor added in v1.2.0

func RegisterProcessor(factory ProcessFactory)

func ReleaseAck

func ReleaseAck(a *ack)

func StopReader

func StopReader(isolation Isolation)

func WatchJobId

func WatchJobId(pipelineName string, sourceName string, jobUid string) string

Types

type AckChainHandler

type AckChainHandler struct {
	// contains filtered or unexported fields
}

func GetOrCreateShareAckChainHandler

func GetOrCreateShareAckChainHandler(sinkCount int, ackConfig AckConfig) *AckChainHandler

func NewAckChainHandler

func NewAckChainHandler(sinkCount int, ackConfig AckConfig) *AckChainHandler

func (*AckChainHandler) StartTask

func (ach *AckChainHandler) StartTask(task *AckTask)

func (*AckChainHandler) Stop

func (ach *AckChainHandler) Stop()

func (*AckChainHandler) StopTask

func (ach *AckChainHandler) StopTask(task *AckTask)

type AckConfig

type AckConfig struct {
	Enable              bool          `yaml:"enable,omitempty" default:"true"`
	MaintenanceInterval time.Duration `yaml:"maintenanceInterval,omitempty" default:"20h"`
}

type AckListener

type AckListener struct {
	// contains filtered or unexported fields
}

func (*AckListener) BeforeQueueConvertBatch

func (al *AckListener) BeforeQueueConvertBatch(events []api.Event)

func (*AckListener) Name

func (al *AckListener) Name() string

func (*AckListener) Stop

func (al *AckListener) Stop()

type AckTask

type AckTask struct {
	Epoch        *pipeline.Epoch
	PipelineName string
	SourceName   string

	StopCountDown *sync.WaitGroup
	// contains filtered or unexported fields
}

func NewAckTask

func NewAckTask(epoch *pipeline.Epoch, pipelineName string, sourceName string, persistenceFunc persistenceFunc) *AckTask

func (*AckTask) Key

func (at *AckTask) Key() string

func (*AckTask) NewAckChain

func (at *AckTask) NewAckChain(jobWatchUid string) *JobAckChain

type AckTaskEvent added in v1.3.0

type AckTaskEvent struct {
	// contains filtered or unexported fields
}

type AckTaskType

type AckTaskType string

type AddonMetaFields added in v1.5.0

type AddonMetaFields struct {
	Pipeline  string `yaml:"pipeline,omitempty"`
	Source    string `yaml:"source,omitempty"`
	Filename  string `yaml:"filename,omitempty"`
	Timestamp string `yaml:"timestamp,omitempty"`
	Offset    string `yaml:"offset,omitempty"`
	Bytes     string `yaml:"bytes,omitempty"`
	Line      string `yaml:"line,omitempty"`
	Hostname  string `yaml:"hostname,omitempty"`
}

type AddonMetaSchema added in v1.5.0

type AddonMetaSchema struct {
	Fields          map[string]string `yaml:"fields,omitempty"`
	FieldsUnderRoot bool              `yaml:"underRoot,omitempty"`
	FieldsUnderKey  string            `yaml:"key,omitempty" default:"state"`
}

type CharsetDecoder added in v1.3.0

type CharsetDecoder struct {
	// contains filtered or unexported fields
}

func NewCharset added in v1.3.0

func NewCharset(charset string, productFunc api.ProductFunc) *CharsetDecoder

func (*CharsetDecoder) Hook added in v1.3.0

func (i *CharsetDecoder) Hook(event api.Event) api.Result

type CleanFiles

type CleanFiles struct {
	MaxHistoryDays int `yaml:"maxHistoryDays,omitempty"`
	// clean unfinished files
	CleanUnfinished bool `yaml:"cleanUnfinished,omitempty"`
}

type CollectConfig

type CollectConfig struct {
	IsolationLevel           string             `yaml:"isolationLevel,omitempty" default:"share"`
	Paths                    []string           `yaml:"paths,omitempty" validate:"required"` // glob pattern
	ExcludeFiles             []string           `yaml:"excludeFiles,omitempty"`              // regular pattern
	IgnoreOlder              *timeutil.Duration `yaml:"ignoreOlder,omitempty"`
	IgnoreSymlink            bool               `yaml:"ignoreSymlink,omitempty" default:"false"`
	RereadTruncated          bool               `yaml:"rereadTruncated,omitempty" default:"true"`                           // Read from the beginning when the file is truncated
	FirstNBytesForIdentifier int                `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
	AddonMeta                bool               `yaml:"addonMeta,omitempty"`
	AddonMetaSchema          AddonMetaSchema    `yaml:"addonMetaSchema,omitempty"`

	Charset string `yaml:"charset,omitempty" default:"utf-8"`

	ReadFromTail              bool          `yaml:"readFromTail,omitempty" default:"false"`
	CleanFiles                *CleanFiles   `yaml:"cleanFiles,omitempty"`
	FdHoldTimeoutWhenInactive time.Duration `yaml:"fdHoldTimeoutWhenInactive,omitempty" default:"5m"`
	FdHoldTimeoutWhenRemove   time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
	// contains filtered or unexported fields
}

func (CollectConfig) IsFileExcluded

func (cc CollectConfig) IsFileExcluded(file string) bool

func (CollectConfig) IsIgnoreOlder

func (cc CollectConfig) IsIgnoreOlder(info os.FileInfo) bool

type Config

type Config struct {
	AckConfig     AckConfig              `yaml:"ack,omitempty"`
	WatchConfig   WatchConfig            `yaml:"watcher,omitempty"`
	ReaderConfig  ReaderConfig           `yaml:",inline,omitempty"`
	CollectConfig CollectConfig          `yaml:",inline,omitempty" validate:"required,dive"`
	Isolation     string                 `yaml:"isolation,omitempty" default:"pipeline"`
	Fields        map[string]interface{} `yaml:"fields,omitempty"`
}

func (*Config) SetDefaults added in v1.5.0

func (c *Config) SetDefaults()

func (*Config) Validate added in v1.3.0

func (c *Config) Validate() error

type Isolation

type Isolation struct {
	Level        IsolationLevel
	PipelineName string
	SourceName   string
}

type IsolationLevel

type IsolationLevel string

type Job

type Job struct {
	EofCount int
	// contains filtered or unexported fields
}

func NewJob

func NewJob(task *WatchTask, filename string, fileInfo os.FileInfo) *Job

func (*Job) Active

func (j *Job) Active() (error, bool)

func (*Job) ChangeStatusTo

func (j *Job) ChangeStatusTo(status JobStatus)

func (*Job) Delete

func (j *Job) Delete()

func (*Job) File added in v1.2.0

func (j *Job) File() *os.File

func (*Job) FileName added in v1.5.0

func (j *Job) FileName() string

func (*Job) GenerateIdentifier

func (j *Job) GenerateIdentifier() error

func (*Job) GetEncodeLineEnd added in v1.3.0

func (j *Job) GetEncodeLineEnd() []byte

func (*Job) GetLineEnd added in v1.3.0

func (j *Job) GetLineEnd() []byte

func (*Job) Index

func (j *Job) Index() uint32

func (*Job) IsDelete

func (j *Job) IsDelete() bool

func (*Job) IsDeleteTimeout

func (j *Job) IsDeleteTimeout(timeout time.Duration) bool

func (*Job) IsRename

func (j *Job) IsRename() bool

func (*Job) IsSame

func (j *Job) IsSame(other *Job) bool

func (*Job) IsStop

func (j *Job) IsStop() bool

func (*Job) LastActiveTime added in v1.2.0

func (j *Job) LastActiveTime() time.Time

func (*Job) NextOffset

func (j *Job) NextOffset(offset int64)

func (*Job) ProductEvent

func (j *Job) ProductEvent(endOffset int64, collectTime time.Time, body []byte)

func (*Job) Read

func (j *Job) Read()

func (*Job) Release

func (j *Job) Release() bool

func (*Job) RenameTo

func (j *Job) RenameTo(newFilename string)

func (*Job) SetLastActiveTime added in v1.5.0

func (j *Job) SetLastActiveTime(t time.Time)

func (*Job) Stop

func (j *Job) Stop()

func (*Job) Sync

func (j *Job) Sync()

func (*Job) Uid

func (j *Job) Uid() string

func (*Job) WatchUid

func (j *Job) WatchUid() string

WatchUid Support repeated collection of the same file by different sources

type JobAckChain

type JobAckChain struct {
	Epoch        *pipeline.Epoch
	PipelineName string
	SourceName   string
	JobWatchUid  string

	Start time.Time
	// contains filtered or unexported fields
}

func (*JobAckChain) Ack

func (ac *JobAckChain) Ack(s *persistence.State)

func (*JobAckChain) Append

func (ac *JobAckChain) Append(s *persistence.State)

func (*JobAckChain) Key

func (ac *JobAckChain) Key() string

func (*JobAckChain) Release

func (ac *JobAckChain) Release()

type JobCollectContext added in v1.2.0

type JobCollectContext struct {
	Job           *Job
	Filename      string
	LastOffset    int64
	BacklogBuffer []byte
	ReadBuffer    []byte

	// runtime property
	WasSend bool
	IsEOF   bool
}

func NewJobCollectContextAndValidate added in v1.2.0

func NewJobCollectContextAndValidate(job *Job, readBuffer, backlogBuffer []byte) (*JobCollectContext, error)

type JobStatus

type JobStatus int

type LineDelimiterValue added in v1.3.0

type LineDelimiterValue struct {
	Charset   string `yaml:"charset,omitempty" default:"utf-8"`
	LineType  string `yaml:"type,omitempty" default:"auto"`
	LineValue string `yaml:"value,omitempty" default:"\n"`
}

type LineEndings added in v1.3.0

type LineEndings struct {
	// contains filtered or unexported fields
}

func (*LineEndings) AddLineEnd added in v1.3.0

func (end *LineEndings) AddLineEnd(pipelineName string, sourceName string, lineDelimiterValue *LineDelimiterValue) error

func (*LineEndings) GetEncodeLineEnd added in v1.3.0

func (end *LineEndings) GetEncodeLineEnd(pipelineName string, sourceName string) []byte

func (*LineEndings) GetLineEnd added in v1.3.0

func (end *LineEndings) GetLineEnd(pipelineName string, sourceName string) []byte

func (*LineEndings) Init added in v1.3.0

func (end *LineEndings) Init()

func (*LineEndings) RemoveLineEnd added in v1.3.0

func (end *LineEndings) RemoveLineEnd(pipelineName string, sourceName string)

type LineEndingsValue added in v1.3.0

type LineEndingsValue struct {
	// contains filtered or unexported fields
}

type LineTerminator added in v1.3.0

type LineTerminator uint8

LineTerminator is the option storing the line terminator characters Supported newline reference: https://en.wikipedia.org/wiki/Newline#Unicode

const (
	Custom = "custom"
	// InvalidTerminator is the invalid terminator
	InvalidTerminator LineTerminator = iota
	// AutoLineTerminator accepts both LF and CR+LF
	AutoLineTerminator
	// LineFeed is the unicode char LF
	LineFeed
	// VerticalTab is the unicode char VT
	VerticalTab
	// FormFeed is the unicode char FF
	FormFeed
	// CarriageReturn is the unicode char CR
	CarriageReturn
	// CarriageReturnLineFeed is the unicode chars CR+LF
	CarriageReturnLineFeed
	// NextLine is the unicode char NEL
	NextLine
	// LineSeparator is the unicode char LS
	LineSeparator
	// ParagraphSeparator is the unicode char PS
	ParagraphSeparator
	// NullTerminator
	NullTerminator
)

type MultiConfig

type MultiConfig struct {
	Active   bool          `yaml:"active,omitempty" default:"false"`
	Pattern  string        `yaml:"pattern,omitempty"`
	MaxLines int           `yaml:"maxLines,omitempty" default:"500"`
	MaxBytes int64         `yaml:"maxBytes,omitempty" default:"131072"` // default 128KB
	Timeout  time.Duration `yaml:"timeout,omitempty" default:"5s"`      // default 2 * read.timeout
}

type MultiHolder

type MultiHolder struct {
	// contains filtered or unexported fields
}

type MultiProcessor

type MultiProcessor struct {
	// contains filtered or unexported fields
}

func GetOrCreateShareMultilineProcessor

func GetOrCreateShareMultilineProcessor() *MultiProcessor

func NewMultiProcessor

func NewMultiProcessor() *MultiProcessor

func (*MultiProcessor) Process

func (mp *MultiProcessor) Process(event api.Event) api.Result

func (*MultiProcessor) StartTask

func (mp *MultiProcessor) StartTask(task *MultiTask)

func (*MultiProcessor) StopTask

func (mp *MultiProcessor) StopTask(task *MultiTask)

type MultiTask

type MultiTask struct {
	// contains filtered or unexported fields
}

func NewMultiTask

func NewMultiTask(epoch *pipeline.Epoch, sourceName string, config MultiConfig, eventPool *event.Pool, productFunc api.ProductFunc) *MultiTask

func (*MultiTask) String

func (mt *MultiTask) String() string

type MultiTaskType

type MultiTaskType string

type Operation

type Operation int

type ProcessChain added in v1.2.0

type ProcessChain interface {
	Process(ctx *JobCollectContext)
}

func NewProcessChain added in v1.2.0

func NewProcessChain(config ReaderConfig) ProcessChain

type ProcessFactory added in v1.2.0

type ProcessFactory func(config ReaderConfig) Processor

type Processor added in v1.2.0

type Processor interface {
	Order() int
	Code() string
	Process(processorChain ProcessChain, ctx *JobCollectContext)
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func GetOrCreateReader

func GetOrCreateReader(isolation Isolation, readerConfig ReaderConfig, watcher *Watcher) *Reader

func GetOrCreateShareReader

func GetOrCreateShareReader(readerConfig ReaderConfig, watcher *Watcher) *Reader

func (*Reader) Start

func (r *Reader) Start()

func (*Reader) Stop

func (r *Reader) Stop()

type ReaderConfig

type ReaderConfig struct {
	LineDelimiter          LineDelimiterValue `yaml:"lineDelimiter,omitempty"`
	MaxSingleLineBytes     int64              `yaml:"maxSingleLineBytes,omitempty" default:"67108864" validate:"gt=65536"` // default 67108864 = 64MB
	WorkerCount            int                `yaml:"workerCount,omitempty" default:"1"`
	ReadChanSize           int                `yaml:"readChanSize,omitempty" default:"512"`     // deprecated
	ReadBufferSize         int                `yaml:"readBufferSize,omitempty" default:"65536"` // The buffer size used for the file reading. default 65536 = 64k = 16*PAGE_SIZE
	MaxContinueRead        int                `yaml:"maxContinueRead,omitempty" default:"16"`
	MaxContinueReadTimeout time.Duration      `yaml:"maxContinueReadTimeout,omitempty" default:"3s"`
	InactiveTimeout        time.Duration      `yaml:"inactiveTimeout,omitempty" default:"3s"`
	MultiConfig            MultiConfig        `yaml:"multi,omitempty"`
	CleanDataTimeout       time.Duration      `yaml:"cleanDataTimeout,omitempty" default:"5s"`
	// contains filtered or unexported fields
}

type Source

type Source struct {
	// contains filtered or unexported fields
}

func (*Source) Category

func (s *Source) Category() api.Category

func (*Source) Commit

func (s *Source) Commit(events []api.Event)

func (*Source) Config

func (s *Source) Config() interface{}

func (*Source) HandleHttp

func (s *Source) HandleHttp()

func (*Source) Init

func (s *Source) Init(context api.Context) error

func (*Source) Product

func (s *Source) Product() api.Event

func (*Source) ProductLoop

func (s *Source) ProductLoop(productFunc api.ProductFunc)

func (*Source) SetCodec added in v1.2.0

func (s *Source) SetCodec(c codec.Codec)

func (*Source) SetSourceConfig added in v1.3.0

func (s *Source) SetSourceConfig(config *source.Config)

func (*Source) Start

func (s *Source) Start() error

func (*Source) Stop

func (s *Source) Stop()

func (*Source) String

func (s *Source) String() string

func (*Source) Type

func (s *Source) Type() api.Type

type WatchConfig

type WatchConfig struct {
	EnableOsWatch             bool          `yaml:"enableOsWatch,omitempty" default:"true"`
	ScanTimeInterval          time.Duration `yaml:"scanTimeInterval,omitempty" default:"10s"`
	MaintenanceInterval       time.Duration `yaml:"maintenanceInterval,omitempty" default:"5m"`
	CleanFiles                *CleanFiles   `yaml:"cleanFiles,omitempty"`                             // deprecated, moved to CollectConfig
	FdHoldTimeoutWhenInactive time.Duration `yaml:"fdHoldTimeoutWhenInactive,omitempty" default:"5m"` // deprecated, moved to CollectConfig
	FdHoldTimeoutWhenRemove   time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`   // deprecated, moved to CollectConfig
	MaxOpenFds                int           `yaml:"maxOpenFds,omitempty" default:"4096"`
	MaxEofCount               int           `yaml:"maxEofCount,omitempty" default:"3"`
	CleanWhenRemoved          bool          `yaml:"cleanWhenRemoved,omitempty" default:"true"`
	ReadFromTail              bool          `yaml:"readFromTail,omitempty" default:"false"` // deprecated, moved to CollectConfig
	TaskStopTimeout           time.Duration `yaml:"taskStopTimeout,omitempty" default:"30s"`
	CleanDataTimeout          time.Duration `yaml:"cleanDataTimeout,omitempty" default:"5s"`
}

type WatchTask

type WatchTask struct {
	// contains filtered or unexported fields
}

func NewWatchTask

func NewWatchTask(epoch *pipeline.Epoch, pipelineName string, sourceName string, config CollectConfig,
	eventPool *event.Pool, productFunc api.ProductFunc, activeChan chan *Job, sourceFields map[string]interface{}) *WatchTask

func (*WatchTask) IsStop added in v1.3.0

func (wt *WatchTask) IsStop() bool

func (*WatchTask) StopJobsInfo

func (wt *WatchTask) StopJobsInfo() string

func (*WatchTask) String

func (wt *WatchTask) String() string

func (*WatchTask) WatchTaskKey

func (wt *WatchTask) WatchTaskKey() string

type WatchTaskEvent added in v1.3.0

type WatchTaskEvent struct {
	// contains filtered or unexported fields
}

type WatchTaskType

type WatchTaskType string

type Watcher

type Watcher struct {
	// contains filtered or unexported fields
}

func GetOrCreateShareWatcher

func GetOrCreateShareWatcher(watchConfig WatchConfig) *Watcher

func (*Watcher) DecideJob added in v1.5.0

func (w *Watcher) DecideJob(job *Job)

DecideJob should be as lightweight as possible. Operations such as releasing file handles should be placed in a separate goroutine of watch

func (*Watcher) StartWatchTask

func (w *Watcher) StartWatchTask(watchTask *WatchTask)

func (*Watcher) Stop

func (w *Watcher) Stop()

func (*Watcher) StopWatchTask

func (w *Watcher) StopWatchTask(watchTask *WatchTask)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL