senders

package
v1.8.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2019 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadESTagIndexMap

func LoadESTagIndexMap(env string, mapi interface{}) map[string]string

func NewKafkaProducer

func NewKafkaProducer(brokers []string) (p sarama.SyncProducer, err error)

Types

type BaseSender

type BaseSender struct {
	IsDiscardWhenBlocked bool
	// contains filtered or unexported fields
}

BaseSender should not put msg into msgpool in sender

func (*BaseSender) DiscardWhenBlocked

func (s *BaseSender) DiscardWhenBlocked() bool

func (*BaseSender) IsTagSupported

func (s *BaseSender) IsTagSupported(tag string) bool

func (*BaseSender) SetCommitChan

func (s *BaseSender) SetCommitChan(commitChan chan<- int64)

func (*BaseSender) SetDiscardChan

func (s *BaseSender) SetDiscardChan(discardChan chan<- *libs.FluentMsg)

func (*BaseSender) SetDiscardWithoutCommitChan

func (s *BaseSender) SetDiscardWithoutCommitChan(discardWithoutCommitChan chan<- *libs.FluentMsg)

func (*BaseSender) SetMsgPool

func (s *BaseSender) SetMsgPool(msgPool *sync.Pool)

func (*BaseSender) SetSupportedTags

func (s *BaseSender) SetSupportedTags(tags []string)

type ESIndexResp

type ESIndexResp struct {
	Id     string `json:"_id"`
	Index  string `json:"_index"`
	Status int    `json:"status"`
}

type ESOpResp

type ESOpResp struct {
	Index *ESIndexResp `json:"index"`
}

type ESResp

type ESResp struct {
	Errors bool        `json:"errors"`
	Items  []*ESOpResp `json:"items"`
}

type ElasticSearchSender

type ElasticSearchSender struct {
	*BaseSender
	*ElasticSearchSenderCfg
	// contains filtered or unexported fields
}

func NewElasticSearchSender

func NewElasticSearchSender(cfg *ElasticSearchSenderCfg) *ElasticSearchSender

func (*ElasticSearchSender) GetName

func (s *ElasticSearchSender) GetName() string

func (*ElasticSearchSender) SendBulkMsgs

func (s *ElasticSearchSender) SendBulkMsgs(ctx *bulkOpCtx, msgs []*libs.FluentMsg) (err error)

func (*ElasticSearchSender) Spawn

func (s *ElasticSearchSender) Spawn(tag string) chan<- *libs.FluentMsg

type ElasticSearchSenderCfg

type ElasticSearchSenderCfg struct {
	Name, Addr, TagKey                          string
	Tags                                        []string
	BatchSize, InChanSize, RetryChanSize, NFork int
	MaxWait                                     time.Duration
	TagIndexMap                                 map[string]string
	IsDiscardWhenBlocked                        bool
}

type FluentSender

type FluentSender struct {
	*BaseSender
	*FluentSenderCfg
	// contains filtered or unexported fields
}

func NewFluentSender

func NewFluentSender(cfg *FluentSenderCfg) *FluentSender

func (*FluentSender) GetName

func (s *FluentSender) GetName() string

func (*FluentSender) Spawn

func (s *FluentSender) Spawn(tag string) chan<- *libs.FluentMsg

type FluentSenderCfg

type FluentSenderCfg struct {
	Name, Addr                                  string
	Tags                                        []string
	BatchSize, InChanSize, RetryChanSize, NFork int
	MaxWait                                     time.Duration
	IsDiscardWhenBlocked                        bool
}

type HTTPSender

type HTTPSender struct {
	*BaseSender
	*HTTPSenderCfg
	// contains filtered or unexported fields
}

func NewHTTPSender

func NewHTTPSender(cfg *HTTPSenderCfg) *HTTPSender

func (*HTTPSender) GetName

func (s *HTTPSender) GetName() string

func (*HTTPSender) SendBulkMsgs

func (s *HTTPSender) SendBulkMsgs(ctx *bulkOpCtx, msgs []*libs.FluentMsg) (err error)

func (*HTTPSender) Spawn

func (s *HTTPSender) Spawn(tag string) chan<- *libs.FluentMsg

type HTTPSenderCfg

type HTTPSenderCfg struct {
	Name, Addr                                  string
	Tags                                        []string
	BatchSize, InChanSize, RetryChanSize, NFork int
	MaxWait                                     time.Duration
	IsDiscardWhenBlocked                        bool
}

type KafkaSender

type KafkaSender struct {
	*BaseSender
	*KafkaSenderCfg
}

func NewKafkaSender

func NewKafkaSender(cfg *KafkaSenderCfg) *KafkaSender

func (*KafkaSender) GetName

func (s *KafkaSender) GetName() string

func (*KafkaSender) Spawn

func (s *KafkaSender) Spawn(tag string) chan<- *libs.FluentMsg

type KafkaSenderCfg

type KafkaSenderCfg struct {
	Name, TagKey                                string
	Brokers                                     []string
	Topic                                       string
	Tags                                        []string
	InChanSize, RetryChanSize, NFork, BatchSize int
	MaxWait                                     time.Duration
	IsDiscardWhenBlocked                        bool
}

type NullSender added in v1.8.9

type NullSender struct {
	*BaseSender
	*NullSenderCfg
}

NullSender /dev/null, will discard all msgs

func NewNullSender added in v1.8.9

func NewNullSender(cfg *NullSenderCfg) *NullSender

NewNullSender create new null sender

func (*NullSender) GetName added in v1.8.9

func (s *NullSender) GetName() string

GetName get the name of null sender

func (*NullSender) Spawn added in v1.8.9

func (s *NullSender) Spawn(tag string) chan<- *libs.FluentMsg

Spawn fork

type NullSenderCfg added in v1.8.9

type NullSenderCfg struct {
	Name, LogLevel                 string
	Tags                           []string
	InChanSize                     int
	IsCommit, IsDiscardWhenBlocked bool
}

NullSenderCfg configuration of NullSender

type SenderItf

type SenderItf interface {
	Spawn(string) chan<- *libs.FluentMsg // Spawn(tag) inChan
	IsTagSupported(string) bool
	DiscardWhenBlocked() bool
	GetName() string

	SetMsgPool(*sync.Pool)
	SetCommitChan(chan<- int64)
	SetSupportedTags([]string)
	SetDiscardChan(chan<- *libs.FluentMsg)
	SetDiscardWithoutCommitChan(chan<- *libs.FluentMsg)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL