reader

package
v5.2.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2017 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package reader provides interface and struct to read messages and report them to a harvester

The interface used is:

type Reader interface {
	Next() (Message, error)
}

Each time Next is called on a reader, a Message object is returned.

Index

Constants

View Source
const (
	JsonErrorKey = "json_error"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Encode

type Encode struct {
	// contains filtered or unexported fields
}

Encode reader produces lines by reading lines from an io.Reader through a decoder converting the reader it's encoding to utf-8.

func NewEncode

func NewEncode(
	reader io.Reader,
	codec encoding.Encoding,
	bufferSize int,
) (Encode, error)

NewEncode creates a new Encode reader from input reader by applying the given codec.

func (Encode) Next

func (p Encode) Next() (Message, error)

Next reads the next line from it's initial io.Reader This converts a io.Reader to a reader.reader

type JSON

type JSON struct {
	// contains filtered or unexported fields
}

func NewJSON

func NewJSON(r Reader, cfg *JSONConfig) *JSON

NewJSONReader creates a new reader that can decode JSON.

func (*JSON) Next

func (r *JSON) Next() (Message, error)

Next decodes JSON and returns the filled Line object.

type JSONConfig

type JSONConfig struct {
	MessageKey    string `config:"message_key"`
	KeysUnderRoot bool   `config:"keys_under_root"`
	OverwriteKeys bool   `config:"overwrite_keys"`
	AddErrorKey   bool   `config:"add_error_key"`
}

func (*JSONConfig) Validate

func (c *JSONConfig) Validate() error

type Limit

type Limit struct {
	// contains filtered or unexported fields
}

LimitProcessor sets an upper limited on line length. Lines longer then the max configured line length will be snapped short.

func NewLimit

func NewLimit(r Reader, maxBytes int) *Limit

NewLimit creates a new reader limiting the line length.

func (*Limit) Next

func (p *Limit) Next() (Message, error)

Next returns the next line.

type Line

type Line struct {
	// contains filtered or unexported fields
}

lineReader reads lines from underlying reader, decoding the input stream using the configured codec. The reader keeps track of bytes consumed from raw input stream for every decoded line.

func NewLine

func NewLine(input io.Reader, codec encoding.Encoding, bufferSize int) (*Line, error)

NewLine creates a new Line reader object

func (*Line) Next

func (l *Line) Next() ([]byte, int, error)

Next reads the next line until the new line character

type Message

type Message struct {
	Ts      time.Time     // timestamp the content was read
	Content []byte        // actual content read
	Bytes   int           // total number of bytes read to generate the message
	Fields  common.MapStr // optional fields that can be added by reader
}

Message represents a reader event with timestamp, content and actual number of bytes read from input before decoding.

func (*Message) AddFields

func (msg *Message) AddFields(fields common.MapStr)

func (*Message) IsEmpty

func (m *Message) IsEmpty() bool

IsEmpty returns true in case the message is empty A message with only newline character is counted as an empty message

type Multiline

type Multiline struct {
	// contains filtered or unexported fields
}

MultiLine reader combining multiple line events into one multi-line event.

Lines to be combined are matched by some configurable predicate using regular expression.

The maximum number of bytes and lines to be returned is fully configurable. Even if limits are reached subsequent lines are matched, until event is fully finished.

Errors will force the multiline reader to return the currently active multiline event first and finally return the actual error on next call to Next.

func NewMultiline

func NewMultiline(
	reader Reader,
	separator string,
	maxBytes int,
	config *MultilineConfig,
) (*Multiline, error)

NewMultiline creates a new multi-line reader combining stream of line events into stream of multi-line events.

func (*Multiline) Next

func (mlr *Multiline) Next() (Message, error)

Next returns next multi-line event.

type MultilineConfig

type MultilineConfig struct {
	Negate   bool           `config:"negate"`
	Match    string         `config:"match"       validate:"required"`
	MaxLines *int           `config:"max_lines"`
	Pattern  *regexp.Regexp `config:"pattern"`
	Timeout  *time.Duration `config:"timeout"     validate:"positive"`
}

func (*MultilineConfig) Validate

func (c *MultilineConfig) Validate() error

type Reader

type Reader interface {
	Next() (Message, error)
}

Reader is the interface that wraps the basic Next method for getting a new message. Next returns the message being read or and error. EOF is returned if reader will not return any new message on subsequent calls.

type StripNewline

type StripNewline struct {
	// contains filtered or unexported fields
}

StripNewline reader removes the last trailing newline characters from read lines.

func NewStripNewline

func NewStripNewline(r Reader) *StripNewline

NewStripNewline creates a new line reader stripping the last tailing newline.

func (*StripNewline) Next

func (p *StripNewline) Next() (Message, error)

Next returns the next line.

type Timeout

type Timeout struct {
	// contains filtered or unexported fields
}

timeoutProcessor will signal some configurable timeout error if no new line can be returned in time.

func NewTimeout

func NewTimeout(reader Reader, signal error, t time.Duration) *Timeout

NewTimeout returns a new timeout reader from an input line reader.

func (*Timeout) Next

func (p *Timeout) Next() (Message, error)

Next returns the next line. If no line was returned before timeout, the configured timeout error is returned. For handline timeouts a goroutine is started for reading lines from configured line reader. Only when underlying reader returns an error, the goroutine will be finished.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL