README
¶
File plugin
It watches for files in the provided directory and reads them line by line.
Each line should contain only one event. It also correctly handles rotations (rename/truncate) and symlinks.
From time to time, it instantly releases and reopens descriptors of the completely processed files.
Such behavior allows files to be deleted by a third party software even though file.d
is still working (in this case the reopening will fail).
A watcher is trying to use the file system events to detect file creation and updates.
But update events don't work with symlinks, so watcher also periodically manually fstat
all tracking files to detect changes.
⚠ It supports the commitment mechanism. But "least once delivery" is guaranteed only if files aren't being truncated. However,
file.d
correctly handles file truncation, there is a little chance of data loss. It isn't afile.d
issue. The data may have been written just before the file truncation. In this case, you may miss to read some events. If you care about the delivery, you should also know that thelogrotate
manual clearly states that copy/truncate may cause data loss even on a rotating stage. So use copy/truncate or similar actions only if your data isn't critical. In order to reduce potential harm of truncation, you can turn on notifications of file changes. By default the plugin is notified only on file creations. Note that following for changes is more CPU intensive.
⚠ Use add_file_name plugin if you want to add filename to events.
Reading docker container log files:
pipelines:
example_docker_pipeline:
input:
type: file
watching_dir: /var/lib/docker/containers
offsets_file: /data/offsets.yaml
filename_pattern: "*-json.log"
persistence_mode: async
Config params
watching_dir
string
required
The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have
/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log
structure, watching_dir
should be /var/my-logs
.
Also the filename_pattern
/dir_pattern
is useful to filter needless files/subdirectories. In the case of using two or more
different directories, it's recommended to setup separate pipelines for each.
offsets_file
string
required
The filename to store offsets of processed files. Offsets are loaded only on initialization.
It's a
yaml
file. You can modify it manually.
filename_pattern
string
default=*
Files that don't meet this pattern will be ignored.
Check out func Glob docs for details.
dir_pattern
string
default=*
Dirs that don't meet this pattern will be ignored.
Check out func Glob docs for details.
persistence_mode
string
default=async
options=async|sync
It defines how to save the offsets file:
async
– it periodically saves the offsets usingasync_interval
. The saving operation is skipped if offsets haven't been changed. Suitable, in most cases, it guarantees at least one delivery and makes almost no overhead.sync
– saves offsets as part of event commitment. It's very slow but excludes the possibility of event duplication in extreme situations like power loss.
Save operation takes three steps:
- Write the temporary file with all offsets;
- Call
fsync()
on it; - Rename the temporary file to the original one.
async_interval
! cfg.Duration
default=1s
Offsets saving interval. Only used if persistence_mode
is set to async
.
read_buffer_size
int
default=131072
The buffer size used for the file reading.
Each worker uses its own buffer so that final memory consumption will be
read_buffer_size*workers_count
.
max_files
int
default=16384
The max amount of opened files. If the limit is exceeded, file.d
will exit with fatal.
Also, it checks your system's file descriptors limit:
ulimit -n
.
offsets_op
string
default=continue
options=continue|tail|reset
An offset operation which will be performed when you add a file as a job:
continue
– uses an offset filetail
– sets an offset to the end of the filereset
– resets an offset to the beginning of the file
It is only used on an initial scan of
watching_dir
. Files that will be caught up later during work will always usereset
operation.
workers_count
cfg.Expression
default=gomaxprocs*8
It defines how many workers will be instantiated. Each worker:
- Reads files (I/O bound)
- Decodes events (CPU bound)
We recommend to set it to 4x-8x of CPU cores.
report_interval
cfg.Duration
default=5s
It defines how often to report statistical information to stdout
maintenance_interval
cfg.Duration
default=10s
It defines how often to perform maintenance For now maintenance consists of two stages:
- Symlinks
- Jobs
Symlinks maintenance detects if underlying file of symlink is changed.
Job maintenance fstat
tracked files to detect if new portion of data have been written to the file. If job is in done
state when it releases and reopens file descriptor to allow third party software delete the file.
should_watch_file_changes
bool
default=false
It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
Generated using insane-doc
Documentation
¶
Index ¶
- Variables
- func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig)
- func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.SugaredLogger) *jobProvider
- func NewWatcher(path string, filenamePattern string, dirPattern string, notifyFn notifyFn, ...) *watcher
- type Config
- type InfoRegistry
- type Job
- type Plugin
- type ResetterRegistry
Constants ¶
This section is empty.
Variables ¶
var InfoRegistryInstance = &InfoRegistry{ plugins: make(map[string]*Plugin), }
var ResetterRegistryInstance = &ResetterRegistry{ pipelineToResetter: make(map[string]*resetter), }
ResetterRegistryInstance is an instance of the registry.
Functions ¶
func NewJobProvider ¶
func NewJobProvider(config *Config, metrics *metricCollection, sugLogger *zap.SugaredLogger) *jobProvider
func NewWatcher ¶
func NewWatcher( path string, filenamePattern string, dirPattern string, notifyFn notifyFn, shouldWatchWrites bool, notifyChannelLengthMetric prometheus.Gauge, logger *zap.SugaredLogger, ) *watcher
NewWatcher creates a watcher that see file creations in the path and if they match filePattern and dirPattern, pass them to notifyFn.
Types ¶
type Config ¶
type Config struct { // > @3@4@5@6 // > // > The source directory to watch for files to process. All subdirectories also will be watched. E.g. if files have // > `/var/my-logs/$YEAR/$MONTH/$DAY/$HOST/$FACILITY-$PROGRAM.log` structure, `watching_dir` should be `/var/my-logs`. // > Also the `filename_pattern`/`dir_pattern` is useful to filter needless files/subdirectories. In the case of using two or more // > different directories, it's recommended to setup separate pipelines for each. WatchingDir string `json:"watching_dir" required:"true"` // * // > @3@4@5@6 // > // > The filename to store offsets of processed files. Offsets are loaded only on initialization. // > > It's a `yaml` file. You can modify it manually. OffsetsFile string `json:"offsets_file" required:"true"` // * OffsetsFileTmp string // > @3@4@5@6 // > // > Files that don't meet this pattern will be ignored. // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. FilenamePattern string `json:"filename_pattern" default:"*"` // * // > @3@4@5@6 // > // > Dirs that don't meet this pattern will be ignored. // > > Check out [func Glob docs](https://golang.org/pkg/path/filepath/#Glob) for details. DirPattern string `json:"dir_pattern" default:"*"` // * // > @3@4@5@6 // > // > It defines how to save the offsets file: // > @persistenceMode|comment-list // > // > Save operation takes three steps: // > * Write the temporary file with all offsets; // > * Call `fsync()` on it; // > * Rename the temporary file to the original one. PersistenceMode string `json:"persistence_mode" default:"async" options:"async|sync"` // * PersistenceMode_ persistenceMode AsyncInterval cfg.Duration `json:"async_interval" default:"1s" parse:"duration"` // *! @3 @4 @5 @6 <br> <br> Offsets saving interval. Only used if `persistence_mode` is set to `async`. AsyncInterval_ time.Duration // > @3@4@5@6 // > // > The buffer size used for the file reading. // > > Each worker uses its own buffer so that final memory consumption will be `read_buffer_size*workers_count`. ReadBufferSize int `json:"read_buffer_size" default:"131072"` // * // > @3@4@5@6 // > // > The max amount of opened files. If the limit is exceeded, `file.d` will exit with fatal. // > > Also, it checks your system's file descriptors limit: `ulimit -n`. MaxFiles int `json:"max_files" default:"16384"` // * // > @3@4@5@6 // > // > An offset operation which will be performed when you add a file as a job: // > @offsetsOp|comment-list // > > It is only used on an initial scan of `watching_dir`. Files that will be caught up later during work will always use `reset` operation. OffsetsOp string `json:"offsets_op" default:"continue" options:"continue|tail|reset"` // * OffsetsOp_ offsetsOp // > @3@4@5@6 // > // > It defines how many workers will be instantiated. // > Each worker: // > * Reads files (I/O bound) // > * Decodes events (CPU bound) // > > We recommend to set it to 4x-8x of CPU cores. WorkersCount cfg.Expression `json:"workers_count" default:"gomaxprocs*8" parse:"expression"` // * WorkersCount_ int // > @3@4@5@6 // > // > It defines how often to report statistical information to stdout ReportInterval cfg.Duration `json:"report_interval" default:"5s" parse:"duration"` // * ReportInterval_ time.Duration // > @3@4@5@6 // > // > It defines how often to perform maintenance // > @maintenance MaintenanceInterval cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` // * MaintenanceInterval_ time.Duration // > @3@4@5@6 // > // > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation ShouldWatchChanges bool `json:"should_watch_file_changes" default:"false"` // * }
type InfoRegistry ¶ added in v0.25.0
type InfoRegistry struct {
// contains filtered or unexported fields
}
func (*InfoRegistry) AddPlugin ¶ added in v0.25.0
func (ir *InfoRegistry) AddPlugin(pipelineName string, plug *Plugin)
func (*InfoRegistry) Info ¶ added in v0.25.0
func (ir *InfoRegistry) Info(w http.ResponseWriter, r *http.Request)
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
type ResetterRegistry ¶
type ResetterRegistry struct {
// contains filtered or unexported fields
}
ResetterRegistry is a registry that holds map of pipeline names to file plugins and functions to reset their offsets.
func (*ResetterRegistry) AddResetter ¶
func (rr *ResetterRegistry) AddResetter(pipelineName string, plug *Plugin)
AddResetter adds plugin to the ResetterRegistry.
func (*ResetterRegistry) Reset ¶
func (rr *ResetterRegistry) Reset(_ http.ResponseWriter, request *http.Request)
Reset truncates jobs if the plugin has started or delete the whole offset file or just one entry if inode or source_id was setted in a request.