concator

package module
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2020 License: MIT Imports: 29 Imported by: 0

README

Go-Fluentd

Rewrite fluentd-server by Golang, Higher performance with less resource requirement.

  • At-Least-Once guarantee(disk WAL)
  • log concatenation by head regexp expression
  • log parsing by regexp expression(support embedded json)
  • log filter by custom plugins(acceptorFilters & tagFilters)
  • multiple receivers(support multiple protocols: msgpack, http, syslog, kafka, ...)
  • multiple senders(support multiple backend: elasticsearch, fluentd, ...)
  • multiple environments deployment(--env: sit, perf, uat, prod)

GitHub release Build Status codecov Commitizen friendly Go Report Card GoDoc

Already running on our PRODUCION since 2018/9.

When processing 1000mbps logs flood:

  • dstat dstat
  • monitor monitor
  • profile profile

Documents:

Description

LogAggregator + Concator + Parser + Producer.

Origin logs emitted from docker look like:

'{"container_id": "xxxxx", "log": "2018-03-06 16:56:22.514 | mscparea | ERROR  | http-nio-8080-exec-1 | com.laisky.cloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest | 84:'
'{"container_id": "xxxxx", "log": "Exception in thread "main" java.lang.IllegalStateException: A book has a null property"}'
'{"container_id": "xxxxx", "log": "\tat com.example.myproject.Author.getBookIds(Author.java:38)"}'
'{"container_id": "xxxxx", "log": "\tat com.example.myproject.Bootstrap.main(Bootstrap.java:14)"}'
'{"container_id": "xxxxx", "log": "Caused by: java.lang.NullPointerException"}'
'{"container_id": "xxxxx", "log": "\tat com.example.myproject.Book.getId(Book.java:22)"}'
'{"container_id": "xxxxx", "log": "\tat com.example.myproject.Author.getBookIds(Author.java:35)"}'
'{"container_id": "xxxxx", "log": "\t... 1 more"}'

After Concator(TagPipeline > concator_f):

&FluentMsg{
    Id: 12345,
    Tag: "spring.sit",
    Message: map[string]interface{}{
        "container_id": "xxxxx",
        "log": "2018-03-06 16:56:22.514 | mscparea | ERROR  | http-nio-8080-exec-1 | com.laisky.cloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest | 84: Exception in thread "main" java.lang.IllegalStateException: A book has a null property\n\tat com.example.myproject.Author.getBookIds(Author.java:38)\n\tat com.example.myproject.Bootstrap.main(Bootstrap.java:14)\nCaused by: java.lang.NullPointerException\n\tat com.example.myproject.Book.getId(Book.java:22)\n\tat com.example.myproject.Author.getBookIds(Author.java:35)\n\t... 1 more",
    },
}

After Parser(TagPipeline > parser_f):

&FluentMsg{
    Id: 12345,
    Tag: "spring.sit",
    Message: map[string]interface{}{
        "container_id": "xxxxx",
        "time": "2018-03-06 16:56:22.514",
        "level": "ERROR",
        "app": "mscparea",
        "thread": "http-nio-8080-exec-1",
        "class": "com.laisky.cloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest",
        "line": 84,
        "message": "Exception in thread "main" java.lang.IllegalStateException: A book has a null property\n\tat com.example.myproject.Author.getBookIds(Author.java:38)\n\tat com.example.myproject.Bootstrap.main(Bootstrap.java:14)\nCaused by: java.lang.NullPointerException\n\tat com.example.myproject.Book.getId(Book.java:22)\n\tat com.example.myproject.Author.getBookIds(Author.java:35)\n\t... 1 more",
    },
}

Then Producer can send logs to anywhere (depends on Senders).

Run

directly run:

go run -race entrypoints/main.go \
  --config=./docs/settings/tiny_settings.yml \
  --env=sit \
  --log-level=debug

run by docker:

docker run -itd --rm --name=go-fluentd -p 24225:24225 -p 8080:8080 \
    -v /etc/configs/go-fluentd:/etc/go-fluentd \
    -v /data/log/fluentd/go-fluentd:/data/log/fluentd/go-fluentd
    ppcelery/go-fluentd:1.13.1 \
    ./go-fluentd \
        --config=/etc/go-fluentd/settings.yml \
        --env=perf \
        --addr=0.0.0.0:8080
        --host=x.x.x.x
        --enable-auto-gc
docker images version
  • stable
  • release
  • dev
  • <feature taskid>

Roles

  • Acceptor (consists of Recvs)
  • AcceptorPipeline (consists of AcceptorFilters)
  • Journal
  • Dispatcher
  • TagPipeline (consists of TagFilters)
    • Concator
    • Parser for each tag
  • PostPipeline (consists of PostFilters)
  • Producer (consists of Senders)

architecture

Acceptor

Contains multiply Recvs (such as KafkRecv & FluentdRecv), can listening tcp port or fetch msg from kafka brokers.

AcceptorPipeline

Contains multiply AcceptorFilters, be used for ignore or retag specific messages. All filters should return very fast to avoid blocking.

Journal

...

Dispatcher

...

TagPipeline

...

PostPipeline

...

Producer

...

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunServer

func RunServer(ctx context.Context, addr string)

RunServer starting http server

func StringListContains

func StringListContains(ls []string, v string) bool

Types

type Acceptor

type Acceptor struct {
	*AcceptorCfg
	// contains filtered or unexported fields
}

Acceptor listening tcp connection, and decode messages

func NewAcceptor

func NewAcceptor(cfg *AcceptorCfg, recvs ...recvs.AcceptorRecvItf) *Acceptor

NewAcceptor create new Acceptor

func (*Acceptor) GetAsyncOutChan

func (a *Acceptor) GetAsyncOutChan() chan *libs.FluentMsg

GetAsyncOutChan return the message chan that received by blockable acceptor

func (*Acceptor) GetSyncOutChan

func (a *Acceptor) GetSyncOutChan() chan *libs.FluentMsg

GetSyncOutChan return the message chan that received by acceptor

func (*Acceptor) Run

func (a *Acceptor) Run(ctx context.Context)

Run starting acceptor to listening and receive messages, you can use `acceptor.MessageChan()` to load messages`

type AcceptorCfg

type AcceptorCfg struct {
	MsgPool                           *sync.Pool
	Journal                           *Journal
	AsyncOutChanSize, SyncOutChanSize int
	MaxRotateID                       int64
}

AcceptorCfg is the configuation of Acceptor

type Controllor

type Controllor struct {
	// contains filtered or unexported fields
}

Controllor is an IoC that manage all roles

func NewControllor

func NewControllor() (c *Controllor)

NewControllor create new Controllor

func (*Controllor) Run

func (c *Controllor) Run(ctx context.Context)

Run starting all pipeline

type Dispatcher

type Dispatcher struct {
	*DispatcherCfg
	// contains filtered or unexported fields
}

Dispatcher dispatch messages by tag to different concator

func NewDispatcher

func NewDispatcher(cfg *DispatcherCfg) *Dispatcher

NewDispatcher create new Dispatcher

func (*Dispatcher) GetOutChan

func (d *Dispatcher) GetOutChan() chan *libs.FluentMsg

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context)

Run dispacher to dispatch messages to different concators

type DispatcherCfg

type DispatcherCfg struct {
	InChan             chan *libs.FluentMsg
	TagPipeline        tagFilters.TagPipelineItf
	NFork, OutChanSize int
}

type Journal

type Journal struct {
	*JournalCfg
	// contains filtered or unexported fields
}

Journal dumps all messages to files, then check every msg with committed id to make sure no msg lost

func NewJournal

func NewJournal(ctx context.Context, cfg *JournalCfg) *Journal

NewJournal create new Journal with `bufDirPath` and `BufSizeBytes`

func (*Journal) CloseTag added in v1.11.0

func (j *Journal) CloseTag(tag string) error

func (*Journal) ConvertMsg2Buf

func (j *Journal) ConvertMsg2Buf(msg *libs.FluentMsg, data *map[string]interface{})

func (*Journal) DumpMsgFlow

func (j *Journal) DumpMsgFlow(ctx context.Context, msgPool *sync.Pool, dumpChan, skipDumpChan chan *libs.FluentMsg) chan *libs.FluentMsg

func (*Journal) GetCommitChan

func (j *Journal) GetCommitChan() chan<- *libs.FluentMsg

func (*Journal) GetOutChan

func (j *Journal) GetOutChan() chan *libs.FluentMsg

func (*Journal) LoadMaxID added in v1.8.9

func (j *Journal) LoadMaxID() (maxID int64, err error)

LoadMaxID load the max committed id from journal

func (*Journal) ProcessLegacyMsg

func (j *Journal) ProcessLegacyMsg(dumpChan chan *libs.FluentMsg) (maxID int64, err2 error)

type JournalCfg

type JournalCfg struct {
	BufDirPath   string
	BufSizeBytes int64
	JournalOutChanLen,
	CommitIDChanLen,
	ChildJournalDataInchanLen,
	ChildJournalIDInchanLen int
	GCIntervalSec  time.Duration
	IsCompress     bool
	MsgPool        *sync.Pool
	CommittedIDTTL time.Duration
}

type Producer

type Producer struct {
	*ProducerCfg
	sync.Mutex
	// contains filtered or unexported fields
}

Producer send messages to downstream

func NewProducer

func NewProducer(cfg *ProducerCfg, senders ...senders.SenderItf) (*Producer, error)

NewProducer create new producer

func (*Producer) Run

func (p *Producer) Run(ctx context.Context)

Run starting <n> Producer to send messages

type ProducerCfg

type ProducerCfg struct {
	DistributeKey          string
	InChan                 chan *libs.FluentMsg
	MsgPool                *sync.Pool
	CommitChan             chan<- *libs.FluentMsg
	NFork, DiscardChanSize int
}

Directories

Path Synopsis
Package recvs defines different kind of receivers.
Package recvs defines different kind of receivers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL