fluentd_forwarder

package module
v0.0.0-...-b7c3958 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2017 License: Apache-2.0 Imports: 27 Imported by: 6

README

fluentd-forwarder

A lightweight Fluentd forwarder written in Go.

Requirements

  • Go v1.4.1 or above
  • Set the $GOPATH environment variable to get fluentd_forwarder under $GOPATH/bin directory.

Build Instructions

To install the required dependencies and build fluentd_forwarder do:

$ go get github.com/fluent/fluentd-forwarder/entrypoints/build_fluentd_forwarder
$ bin/build_fluentd_forwarder fluentd_forwarder

Running fluentd_forwarder

$ $GOPATH/bin/fluentd_forwarder

Without arguments, it simply listens on 127.0.0.1:24224 and forwards the events to 127.0.0.1:24225.

It gracefully stops in response to SIGINT.

If you want to specify where to forward the events, try the following:

$ $GOPATH/bin/fluentd_forwarder -to fluent://some-remote-node.local:24224

Command-line Options

  • -retry-interval

    Retry interval in which connection is tried against the remote agent.

    -retry-interval 5s
    
  • -conn-timeout

    Connection timeout after which the connection has failed.

    -conn-timeout 10s
    
  • -write-timeout

    Write timeout on wire.

    -write-timeout 30s
    
  • -flush-interval

    Flush interval in which the events are forwareded to the remote agent .

    -flush-interval 5s
    
  • -listen-on

    Interface address and port on which the forwarder listens.

    -listen-on 127.0.0.1:24224
    
  • -to

    Host and port to which the events are forwarded.

    -to remote-host.local:24225
    -to fluent://remote-host.local:24225
    -to td+https://urlencoded-api-key@/*/*
    -to td+https://urlencoded-api-key@/database/*
    -to td+https://urlencoded-api-key@/database/table
    -to td+https://urlencoded-api-key@endpoint/*/*
    
  • -ca-certs

    SSL CA certficates to be verified against when the secure connection is used. Must be in PEM format. You can use the one bundled with td-client-ruby.

    -ca-certs ca-bundle.crt
    
  • -buffer-path

    Directory / path on which buffer files are created. * may be used within the path to indicate the prefix or suffix like var/pre*suf

    -buffer-path /var/lib/fluent-forwarder
    -buffer-path /var/lib/fluent-forwarder/prefix*suffix
    
  • -buffer-chunk-limit

    Maximum size of a buffer chunk

    -buffer-chunk-limit 16777216
    
  • -parallelism

    Number of simultaneous connections used to submit events. It takes effect only when the target is td+http(s).

    -parallelism 1
    
  • -log-level

    Logging level. Any one of the following values; CRITICAL, ERROR, WARNING, NOTICE, INFO and DEBUG.

    -log-level DEBUG
    
  • -log-file

    Species the path to the log file. By default logging is performed to the standard error. It may contain strftime(3)-like format specifications like %Y in any positions. If the parent directory doesn't exist at the time the logging is performed, all the leading directories are created automatically so you can specify the path like /var/log/fluentd_forwarder/%Y-%m-%d/fluentd_forwarder.log

    -log-file /var/log/fluentd_forwarder.log
    
  • -config

    Specifies the path to the configuration file. The syntax is detailed below.

    -config /etc/fluentd-forwarder/fluentd-forwarder.cfg
    
  • -metadata

    Specifies the additional data to insert metadata record. The syntax is detailed below.

    -metadata "custom metadata"
    

Configuration File

The syntax of the configuration file is so-called INI format with the name of the primary section being fluentd-forwarder. Each setting is named exactly the same as the command-line counterpart, except for -config. (It is not possible to refer to another configuation file from a configuration file)

[fluentd-forwarder]
to = fluent://remote.local:24224
buffer-chunk-limit = 16777216
flush-interval = 10s
retry-interval = 1s

Dependencies

fluentd_forwarder depends on the following external libraries:

  • github.com/ugorji/go/codec
  • github.com/op/go-logging
  • github.com/jehiah/go-strftime
  • github.com/moriyoshi/go-ioextras
  • gopkg.in/gcfg.v1
  • github.com/treasure-data/td-client-go

License

The source code and its object form ("Work"), unless otherwise specified, are licensed under the Apache Software License, Version 2.0. You may not use the Work except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

A portion of the code originally written by Moriyoshi Koizumi and later modified by Treasure Data, Inc. continues to be published and distributed under the same terms and conditions as the MIT license, with its authorship being attributed to the both parties. It is specified at the top of the applicable source files.

Documentation

Index

Constants

View Source
const (
	Head = JournalFileType('b')
	Rest = JournalFileType('q')
)

Variables

View Source
var NilJournalPathInfo = JournalPathInfo{"", 0, "", "", 0, nil}

Functions

func BuildJournalPathWithTSuffix

func BuildJournalPathWithTSuffix(key string, bq JournalFileType, tSuffix string) string

func IsValidJournalPathInfo

func IsValidJournalPathInfo(info JournalPathInfo) bool

Types

type CompressingBlob

type CompressingBlob struct {
	// contains filtered or unexported fields
}

func NewCompressingBlob

func NewCompressingBlob(blob td_client.Blob, bufferSize int, level int, tempFactory ioextras.RandomAccessStoreFactory) *CompressingBlob

func (*CompressingBlob) Dispose

func (blob *CompressingBlob) Dispose() error

func (*CompressingBlob) MD5Sum

func (blob *CompressingBlob) MD5Sum() ([]byte, error)

func (*CompressingBlob) Reader

func (blob *CompressingBlob) Reader() (io.ReadCloser, error)

func (*CompressingBlob) Size

func (blob *CompressingBlob) Size() (int64, error)

type CompressingBlobReader

type CompressingBlobReader struct {
	// contains filtered or unexported fields
}

func (*CompressingBlobReader) Close

func (reader *CompressingBlobReader) Close() error

func (*CompressingBlobReader) Read

func (reader *CompressingBlobReader) Read(p []byte) (int, error)

type ConnectionCountTopic

type ConnectionCountTopic struct{}

type Disposable

type Disposable interface {
	Dispose() error
}

type EntryCountTopic

type EntryCountTopic struct{}

type Errors

type Errors []error

func (Errors) Error

func (e Errors) Error() string

type FileJournal

type FileJournal struct {
	// contains filtered or unexported fields
}

func (*FileJournal) AddFlushListener

func (journal *FileJournal) AddFlushListener(listener JournalChunkListener)

func (*FileJournal) AddNewChunkListener

func (journal *FileJournal) AddNewChunkListener(listener JournalChunkListener)

func (*FileJournal) Dispose

func (journal *FileJournal) Dispose() error

func (*FileJournal) Flush

func (journal *FileJournal) Flush(visitor func(JournalChunk) interface{}) error

func (*FileJournal) Key

func (journal *FileJournal) Key() string

func (*FileJournal) TailChunk

func (journal *FileJournal) TailChunk() JournalChunk

func (*FileJournal) Write

func (journal *FileJournal) Write(data []byte) error

type FileJournalChunk

type FileJournalChunk struct {
	Size int64 // This variable must be on 64-bit alignment. Otherwise atomic.AddInt64 will cause a crash on ARM and x86-32

	Path      string
	Type      JournalFileType
	TSuffix   string
	Timestamp int64
	UniqueId  []byte
	// contains filtered or unexported fields
}

type FileJournalChunkDequeue

type FileJournalChunkDequeue struct {
	// contains filtered or unexported fields
}

type FileJournalChunkDequeueHead

type FileJournalChunkDequeueHead struct {
	// contains filtered or unexported fields
}

type FileJournalChunkWrapper

type FileJournalChunkWrapper struct {
	// contains filtered or unexported fields
}

func (*FileJournalChunkWrapper) Dispose

func (wrapper *FileJournalChunkWrapper) Dispose() error

func (*FileJournalChunkWrapper) Dup

func (wrapper *FileJournalChunkWrapper) Dup() JournalChunk

func (*FileJournalChunkWrapper) Id

func (wrapper *FileJournalChunkWrapper) Id() string

func (*FileJournalChunkWrapper) MD5Sum

func (wrapper *FileJournalChunkWrapper) MD5Sum() ([]byte, error)

func (*FileJournalChunkWrapper) NextChunk

func (wrapper *FileJournalChunkWrapper) NextChunk() JournalChunk

func (*FileJournalChunkWrapper) Path

func (wrapper *FileJournalChunkWrapper) Path() (string, error)

func (*FileJournalChunkWrapper) Reader

func (wrapper *FileJournalChunkWrapper) Reader() (io.ReadCloser, error)

func (*FileJournalChunkWrapper) Size

func (wrapper *FileJournalChunkWrapper) Size() (int64, error)

func (*FileJournalChunkWrapper) String

func (wrapper *FileJournalChunkWrapper) String() string

type FileJournalGroup

type FileJournalGroup struct {
	// contains filtered or unexported fields
}

func (*FileJournalGroup) Dispose

func (journalGroup *FileJournalGroup) Dispose() error

func (*FileJournalGroup) GetFileJournal

func (journalGroup *FileJournalGroup) GetFileJournal(key string) *FileJournal

func (*FileJournalGroup) GetJournal

func (journalGroup *FileJournalGroup) GetJournal(key string) Journal

func (*FileJournalGroup) GetJournalKeys

func (journalGroup *FileJournalGroup) GetJournalKeys() []string

type FileJournalGroupFactory

type FileJournalGroupFactory struct {
	// contains filtered or unexported fields
}

func NewFileJournalGroupFactory

func NewFileJournalGroupFactory(
	logger *logging.Logger,
	randSource rand.Source,
	timeGetter func() time.Time,
	defaultPathSuffix string,
	defaultFileMode os.FileMode,
	maxSize int64,
) *FileJournalGroupFactory

func (*FileJournalGroupFactory) GetJournalGroup

func (factory *FileJournalGroupFactory) GetJournalGroup(path string, worker Worker) (*FileJournalGroup, error)

type FluentRecord

type FluentRecord struct {
	Tag       string
	Timestamp uint64
	Data      map[string]interface{}
}

type FluentRecordSet

type FluentRecordSet struct {
	Tag     string
	Records []TinyFluentRecord
}

type ForwardInput

type ForwardInput struct {
	// contains filtered or unexported fields
}

func NewForwardInput

func NewForwardInput(logger *logging.Logger, bind string, port Port) (*ForwardInput, error)

func (*ForwardInput) Start

func (input *ForwardInput) Start()

func (*ForwardInput) Stop

func (input *ForwardInput) Stop()

func (*ForwardInput) String

func (input *ForwardInput) String() string

func (*ForwardInput) WaitForShutdown

func (input *ForwardInput) WaitForShutdown()

type ForwardInputFactory

type ForwardInputFactory struct{}

type ForwardOutput

type ForwardOutput struct {
	// contains filtered or unexported fields
}

func NewForwardOutput

func NewForwardOutput(logger *logging.Logger, bind string, retryInterval time.Duration, connectionTimeout time.Duration, writeTimeout time.Duration, flushInterval time.Duration, journalGroupPath string, maxJournalChunkSize int64, metadata string) (*ForwardOutput, error)

func (*ForwardOutput) Emit

func (output *ForwardOutput) Emit(recordSets []FluentRecordSet) error

func (*ForwardOutput) Start

func (output *ForwardOutput) Start()

func (*ForwardOutput) Stop

func (output *ForwardOutput) Stop()

func (*ForwardOutput) String

func (output *ForwardOutput) String() string

func (*ForwardOutput) WaitForShutdown

func (output *ForwardOutput) WaitForShutdown()

type Journal

type Journal interface {
	Disposable
	Key() string
	Write(data []byte) error
	TailChunk() JournalChunk
	AddNewChunkListener(JournalChunkListener)
	AddFlushListener(JournalChunkListener)
	Flush(func(JournalChunk) interface{}) error
}

type JournalChunk

type JournalChunk interface {
	Disposable
	Id() string
	String() string
	Size() (int64, error)
	Reader() (io.ReadCloser, error)
	NextChunk() JournalChunk
	MD5Sum() ([]byte, error)
	Dup() JournalChunk
}

type JournalChunkListener

type JournalChunkListener interface {
	NewChunkCreated(JournalChunk) error
	ChunkFlushed(JournalChunk) error
}

type JournalFileType

type JournalFileType rune

type JournalGroup

type JournalGroup interface {
	Disposable
	GetJournal(key string) Journal
	GetJournalKeys() []string
}

type JournalGroupFactory

type JournalGroupFactory interface {
	GetJournalGroup() JournalGroup
}

type JournalPathInfo

type JournalPathInfo struct {
	Key             string
	Type            JournalFileType
	VariablePortion string
	TSuffix         string
	Timestamp       int64 // elapsed time in msec since epoch
	UniqueId        []byte
}

func BuildJournalPath

func BuildJournalPath(key string, bq JournalFileType, time_ time.Time, randValue int64) JournalPathInfo

func DecodeJournalPath

func DecodeJournalPath(variablePortion string) (JournalPathInfo, error)

type Panicked

type Panicked struct {
	Opaque interface{}
}

func (*Panicked) Error

func (e *Panicked) Error() string

type Port

type Port interface {
	Emit(recordSets []FluentRecordSet) error
}

type TDOutput

type TDOutput struct {
	// contains filtered or unexported fields
}

func NewTDOutput

func NewTDOutput(
	logger *logging.Logger,
	endpoint string,
	connectionTimeout time.Duration,
	writeTimeout time.Duration,
	flushInterval time.Duration,
	parallelism int,
	journalGroupPath string,
	maxJournalChunkSize int64,
	apiKey string,
	databaseName string,
	tableName string,
	tempDir string,
	useSsl bool,
	rootCAs *x509.CertPool,
	httpProxy string,
	metadata string,
) (*TDOutput, error)

func (*TDOutput) Emit

func (output *TDOutput) Emit(recordSets []FluentRecordSet) error

func (*TDOutput) Start

func (output *TDOutput) Start()

func (*TDOutput) Stop

func (output *TDOutput) Stop()

func (*TDOutput) String

func (output *TDOutput) String() string

func (*TDOutput) WaitForShutdown

func (output *TDOutput) WaitForShutdown()

type TinyFluentRecord

type TinyFluentRecord struct {
	Timestamp uint64
	Data      map[string]interface{}
}

type Worker

type Worker interface {
	String() string
	Start()
	Stop()
	WaitForShutdown()
}

type WorkerSet

type WorkerSet struct {
	// contains filtered or unexported fields
}

func NewWorkerSet

func NewWorkerSet() *WorkerSet

func (*WorkerSet) Add

func (set *WorkerSet) Add(worker Worker)

func (*WorkerSet) Remove

func (set *WorkerSet) Remove(worker Worker)

func (*WorkerSet) Slice

func (set *WorkerSet) Slice() []Worker

Directories

Path Synopsis
entrypoints

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL