senders

package
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2020 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadESTagIndexMap

func LoadESTagIndexMap(env string, mapi interface{}) map[string]string

func NewKafkaProducer

func NewKafkaProducer(brokers []string) (p sarama.SyncProducer, err error)

Types

type BaseSender

type BaseSender struct {
	IsDiscardWhenBlocked bool
	// contains filtered or unexported fields
}

BaseSender should not put msg into msgpool in sender

func (*BaseSender) DiscardWhenBlocked

func (s *BaseSender) DiscardWhenBlocked() bool

func (*BaseSender) IsTagSupported

func (s *BaseSender) IsTagSupported(tag string) bool

func (*BaseSender) SetCommitChan

func (s *BaseSender) SetCommitChan(commitChan chan<- *libs.FluentMsg)

func (*BaseSender) SetFailedChan added in v1.12.3

func (s *BaseSender) SetFailedChan(failedChan chan<- *libs.FluentMsg)

func (*BaseSender) SetMsgPool

func (s *BaseSender) SetMsgPool(msgPool *sync.Pool)

func (*BaseSender) SetSuccessedChan added in v1.12.3

func (s *BaseSender) SetSuccessedChan(successedChan chan<- *libs.FluentMsg)

func (*BaseSender) SetSupportedTags

func (s *BaseSender) SetSupportedTags(tags []string)

type ESIndexResp

type ESIndexResp struct {
	Id     string `json:"_id"`
	Index  string `json:"_index"`
	Status int    `json:"status"`
}

type ESOpResp

type ESOpResp struct {
	Index *ESIndexResp `json:"index"`
}

type ESResp

type ESResp struct {
	Errors bool `json:"errors"`
}

type ElasticSearchSender

type ElasticSearchSender struct {
	*BaseSender
	*ElasticSearchSenderCfg
	// contains filtered or unexported fields
}

func NewElasticSearchSender

func NewElasticSearchSender(cfg *ElasticSearchSenderCfg) *ElasticSearchSender

func (*ElasticSearchSender) GetName

func (s *ElasticSearchSender) GetName() string

func (*ElasticSearchSender) SendBulkMsgs

func (s *ElasticSearchSender) SendBulkMsgs(bulkCtx *bulkOpCtx, msgs []*libs.FluentMsg) (err error)

func (*ElasticSearchSender) Spawn

func (s *ElasticSearchSender) Spawn(ctx context.Context) chan<- *libs.FluentMsg

type ElasticSearchSenderCfg

type ElasticSearchSenderCfg struct {
	Name, Addr, TagKey           string
	Tags                         []string
	BatchSize, InChanSize, NFork int
	MaxWait                      time.Duration
	TagIndexMap                  map[string]string
	IsDiscardWhenBlocked         bool
}

type FluentSender

type FluentSender struct {
	*BaseSender
	*FluentSenderCfg
}

func NewFluentSender

func NewFluentSender(cfg *FluentSenderCfg) *FluentSender

func (*FluentSender) GetName

func (s *FluentSender) GetName() string

func (*FluentSender) Spawn

func (s *FluentSender) Spawn(ctx context.Context) chan<- *libs.FluentMsg

type FluentSenderCfg

type FluentSenderCfg struct {
	Name, Addr                   string
	Tags                         []string
	BatchSize, InChanSize, NFork int
	MaxWait                      time.Duration
	IsDiscardWhenBlocked         bool
	ConcatCfg                    map[string]interface{}
}

type HTTPSender

type HTTPSender struct {
	*BaseSender
	*HTTPSenderCfg
	// contains filtered or unexported fields
}

func NewHTTPSender

func NewHTTPSender(cfg *HTTPSenderCfg) *HTTPSender

func (*HTTPSender) GetName

func (s *HTTPSender) GetName() string

func (*HTTPSender) SendBulkMsgs

func (s *HTTPSender) SendBulkMsgs(bulkCtx *bulkOpCtx, msgs []*libs.FluentMsg) (err error)

func (*HTTPSender) Spawn

func (s *HTTPSender) Spawn(ctx context.Context) chan<- *libs.FluentMsg

type HTTPSenderCfg

type HTTPSenderCfg struct {
	Name, Addr                                  string
	Tags                                        []string
	BatchSize, InChanSize, RetryChanSize, NFork int
	MaxWait                                     time.Duration
	IsDiscardWhenBlocked                        bool
}

type KafkaSender

type KafkaSender struct {
	*BaseSender
	*KafkaSenderCfg
}

func NewKafkaSender

func NewKafkaSender(cfg *KafkaSenderCfg) *KafkaSender

func (*KafkaSender) GetName

func (s *KafkaSender) GetName() string

func (*KafkaSender) Spawn

func (s *KafkaSender) Spawn(ctx context.Context) chan<- *libs.FluentMsg

type KafkaSenderCfg

type KafkaSenderCfg struct {
	Name, TagKey                 string
	Brokers                      []string
	Topic                        string
	Tags                         []string
	InChanSize, NFork, BatchSize int
	MaxWait                      time.Duration
	IsDiscardWhenBlocked         bool
}

type SenderItf

type SenderItf interface {
	Spawn(context.Context) chan<- *libs.FluentMsg // Spawn(ctx) inChan
	IsTagSupported(string) bool
	DiscardWhenBlocked() bool
	GetName() string

	SetMsgPool(*sync.Pool)
	SetCommitChan(chan<- *libs.FluentMsg)
	SetSupportedTags([]string)
	SetSuccessedChan(chan<- *libs.FluentMsg)
	SetFailedChan(chan<- *libs.FluentMsg)
}

type StdoutSender added in v1.13.1

type StdoutSender struct {
	utils.Counter

	*BaseSender
	*StdoutSenderCfg
	// contains filtered or unexported fields
}

StdoutSender print or discard

func NewStdoutSender added in v1.13.1

func NewStdoutSender(cfg *StdoutSenderCfg) *StdoutSender

NewStdoutSender create new null sender

func (*StdoutSender) GetName added in v1.13.1

func (s *StdoutSender) GetName() string

GetName get the name of null sender

func (*StdoutSender) Spawn added in v1.13.1

func (s *StdoutSender) Spawn(ctx context.Context) chan<- *libs.FluentMsg

Spawn fork

type StdoutSenderCfg added in v1.13.1

type StdoutSenderCfg struct {
	Name, LogLevel                 string
	Tags                           []string
	NFork, InChanSize              int
	IsCommit, IsDiscardWhenBlocked bool
}

StdoutSenderCfg configuration of StdoutSender

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL