file

package
Version: v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2021 License: BSD-3-Clause Imports: 18 Imported by: 0

README

File plugin

It watches for files in the provided directory and reads them line by line.

Each line should contain only one event. It also correctly handles rotations (rename/truncate) and symlinks.

From time to time, it instantly releases and reopens descriptors of the completely processed files. Such behavior allows files to be deleted by a third party software even though file.d is still working (in this case the reopening will fail).

A watcher is trying to use the file system events to detect file creation and updates. But update events don't work with symlinks, so watcher also periodically manually fstat all tracking files to detect changes.

⚠ It supports the commitment mechanism. But "least once delivery" is guaranteed only if files aren't being truncated. However, file.d correctly handles file truncation, there is a little chance of data loss. It isn't a file.d issue. The data may have been written just before the file truncation. In this case, you may miss to read some events. If you care about the delivery, you should also know that the logrotate manual clearly states that copy/truncate may cause data loss even on a rotating stage. So use copy/truncate or similar actions only if your data isn't critical.

Reading docker container log files:

pipelines:
  example_docker_pipeline:
    input:
        type: file
        watching_dir: /var/lib/docker/containers
        offsets_file: /data/offsets.yaml
        filename_pattern: "*-json.log"
        persistence_mode: async
Config params

watching_dir string required

The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have /var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log structure, watching_dir should be /var/my-logs. Also the filename_pattern/dir_pattern is useful to filter needless files/subdirectories. In the case of using two or more different directories, it's recommended to setup separate pipelines for each.


offsets_file string required

The filename to store offsets of processed files. Offsets are loaded only on initialization.

It's a yaml file. You can modify it manually.


filename_pattern string default=*

Files that don't meet this pattern will be ignored.

Check out func Glob docs for details.


dir_pattern string default=*

Dirs that don't meet this pattern will be ignored.

Check out func Glob docs for details.


persistence_mode string default=async options=async|sync

It defines how to save the offsets file:

  • async – it periodically saves the offsets using async_interval. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.
  • sync – saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.

Save operation takes three steps:

  • Write the temporary file with all offsets;
  • Call fsync() on it;
  • Rename the temporary file to the original one.

read_buffer_size int default=131072

The buffer size used for the file reading.

Each worker uses its own buffer so that final memory consumption will be read_buffer_size*workers_count.


max_files int default=16384

The max amount of opened files. If the limit is exceeded, file.d will exit with fatal.

Also, it checks your system's file descriptors limit: ulimit -n.


offsets_op string default=continue options=continue|tail|reset

An offset operation which will be performed when you add a file as a job:

  • continue – uses an offset file
  • tail – sets an offset to the end of the file
  • reset – resets an offset to the beginning of the file

It is only used on an initial scan of watching_dir. Files that will be caught up later during work will always use reset operation.


workers_count cfg.Expression default=gomaxprocs*8

It defines how many workers will be instantiated. Each worker:

  • Reads files (I/O bound)
  • Decodes events (CPU bound)

We recommend to set it to 4x-8x of CPU cores.


report_interval cfg.Duration default=10s

It defines how often to report statistical information to stdout


maintenance_interval cfg.Duration default=10s

It defines how often to perform maintenance For now maintenance consists of two stages:

  • Symlinks
  • Jobs

Symlinks maintenance detects if underlying file of symlink is changed. Job maintenance fstat tracked files to detect if new portion of data have been written to the file. If job is in done state when it releases and reopens file descriptor to allow third party software delete the file.



Generated using insane-doc

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Factory

func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)

func NewJobProvider

func NewJobProvider(config *Config, controller pipeline.InputPluginController, logger *zap.SugaredLogger) *jobProvider

func NewWatcher

func NewWatcher(path string, filenamePattern string, dirPattern string, notifyFn notify, logger *zap.SugaredLogger) *watcher

Types

type Config

type Config struct {

	//> @3@4@5@6
	//>
	//> The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
	//> `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`.
	//> Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more
	//> different directories, it's recommended to setup separate pipelines for each.
	WatchingDir string `json:"watching_dir" required:"true"` //*

	//> @3@4@5@6
	//>
	//> The filename to store offsets of processed files. Offsets are loaded only on initialization.
	//> > It's a `yaml` file. You can modify it manually.
	OffsetsFile    string `json:"offsets_file" required:"true"` //*
	OffsetsFileTmp string

	//> @3@4@5@6
	//>
	//> Files that don't meet this pattern will be ignored.
	//> > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
	FilenamePattern string `json:"filename_pattern" default:"*"` //*

	//> @3@4@5@6
	//>
	//> Dirs that don't meet this pattern will be ignored.
	//> > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details.
	DirPattern string `json:"dir_pattern" default:"*"` //*

	//> @3@4@5@6
	//>
	//> It defines how to save the offsets file:
	//> @persistenceMode|comment-list
	//>
	//> Save operation takes three steps:
	//> *  Write the temporary file with all offsets;
	//> *  Call `fsync()` on it;
	//> *  Rename the temporary file to the original one.
	PersistenceMode  string `json:"persistence_mode" default:"async" options:"async|sync"` //*
	PersistenceMode_ persistenceMode

	AsyncInterval  cfg.Duration `json:"async_interval" default:"1s" parse:"duration"` // *! @3 @4 @5 @6 <br> <br> Offsets saving interval. Only used if `persistence_mode` is set to `async`.
	AsyncInterval_ time.Duration

	//> @3@4@5@6
	//>
	//> The buffer size used for the file reading.
	//> > Each worker uses its own buffer so that final memory consumption will be `read_buffer_size*workers_count`.
	ReadBufferSize int `json:"read_buffer_size" default:"131072"` //*

	//> @3@4@5@6
	//>
	//> The max amount of opened files. If the limit is exceeded, `file.d` will exit with fatal.
	//> > Also, it checks your system's file descriptors limit: `ulimit -n`.
	MaxFiles int `json:"max_files" default:"16384"` //*

	//> @3@4@5@6
	//>
	//> An offset operation which will be performed when you add a file as a job:
	//> @offsetsOp|comment-list
	//> > It is only used on an initial scan of `watching_dir`. Files that will be caught up later during work will always use `reset` operation.
	OffsetsOp  string `json:"offsets_op" default:"continue" options:"continue|tail|reset"` //*
	OffsetsOp_ offsetsOp

	//> @3@4@5@6
	//>
	//> It defines how many workers will be instantiated.
	//> Each worker:
	//> * Reads files (I/O bound)
	//> * Decodes events (CPU bound)
	//> > We recommend to set it to 4x-8x of CPU cores.
	WorkersCount  cfg.Expression `json:"workers_count" default:"gomaxprocs*8" parse:"expression"` //*
	WorkersCount_ int

	//> @3@4@5@6
	//>
	//> It defines how often to report statistical information to stdout
	ReportInterval  cfg.Duration `json:"report_interval" default:"10s" parse:"duration"` //*
	ReportInterval_ time.Duration

	//> @3@4@5@6
	//>
	//> It defines how often to perform maintenance
	//> @maintenance
	MaintenanceInterval  cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` //*
	MaintenanceInterval_ time.Duration
}

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

func (*Plugin) Commit

func (p *Plugin) Commit(event *pipeline.Event)

func (*Plugin) Start

func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginParams)

func (*Plugin) Stop

func (p *Plugin) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to