Documentation
¶
Overview ¶
Package sender provides log message sending functionality
Package sender provides log message sending functionality
Index ¶
Constants ¶
const ( // DefaultWorkersPerQueue - By default most pipelines will only require a single sender worker, as the single worker itself can // concurrently transmit multiple http requests at once. This value is not intended to be configurable, but legacy // usages of the sender will override this value where necessary. If there is a desire to edit the concurrency of the senders // via config, see the BatchMaxConcurrentSend endpoint setting. DefaultWorkersPerQueue = 1 // DefaultQueuesCount - By default most pipelines will only require a single queue, as the single queue itself can // concurrently transmit multiple http requests at once. Systems forced in to a legacy mode will override this value. DefaultQueuesCount = 1 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DestinationFactory ¶ added in v0.66.0
type DestinationFactory func(id string) *client.Destinations
DestinationFactory used to generate client destinations on each call.
type DestinationSender ¶
type DestinationSender struct {
// contains filtered or unexported fields
}
DestinationSender wraps a destination to send messages blocking on a full buffer, but not blocking when a destination is retrying
func NewDestinationSender ¶
func NewDestinationSender(config pkgconfigmodel.Reader, destination client.Destination, output chan *message.Payload, bufferSize int) *DestinationSender
NewDestinationSender creates a new DestinationSender
func (*DestinationSender) NonBlockingSend ¶
func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool
NonBlockingSend tries to send the payload and fails silently if the input is full. returns false if the buffer is full - true if successful.
func (*DestinationSender) Send ¶
func (d *DestinationSender) Send(payload *message.Payload) bool
Send sends a payload and blocks if the input is full. It will not block if the destination is retrying payloads and will cancel the blocking attempt if the retry state changes
func (*DestinationSender) Stop ¶
func (d *DestinationSender) Stop()
Stop stops the DestinationSender
type MessageBuffer ¶
type MessageBuffer struct {
// contains filtered or unexported fields
}
MessageBuffer accumulates message metadata to a buffer until the max capacity is reached.
func NewMessageBuffer ¶
func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer
NewMessageBuffer returns a new MessageBuffer.
func (*MessageBuffer) AddMessage ¶
func (p *MessageBuffer) AddMessage(message *message.Message) bool
AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.
func (*MessageBuffer) ContentSizeLimit ¶
func (p *MessageBuffer) ContentSizeLimit() int
ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.
func (*MessageBuffer) GetMessages ¶
func (p *MessageBuffer) GetMessages() []*message.MessageMetadata
GetMessages returns the messages stored in the buffer.
func (*MessageBuffer) IsEmpty ¶
func (p *MessageBuffer) IsEmpty() bool
IsEmpty returns true if the buffer is empty.
func (*MessageBuffer) IsFull ¶
func (p *MessageBuffer) IsFull() bool
IsFull returns true if the buffer is full.
type Mock ¶ added in v0.66.0
type Mock struct {
// contains filtered or unexported fields
}
Mock represents a mocked sender that fulfills the pipeline component interface
func NewMockSender ¶ added in v0.66.0
func NewMockSender() *Mock
NewMockSender generates a mock sender
func (*Mock) PipelineMonitor ¶ added in v0.66.0
func (s *Mock) PipelineMonitor() metrics.PipelineMonitor
PipelineMonitor returns an instance of NoopPipelineMonitor
type MockServerlessMeta ¶ added in v0.67.0
type MockServerlessMeta struct {
// contains filtered or unexported fields
}
MockServerlessMeta is a struct that contains essential control structures for serverless mode. Do not access any methods on this struct without checking IsEnabled first.
func NewMockServerlessMeta ¶ added in v0.67.0
func NewMockServerlessMeta(isEnabled bool) *MockServerlessMeta
NewMockServerlessMeta returns a new MockServerlessMeta
func (*MockServerlessMeta) IsEnabled ¶ added in v0.67.0
func (s *MockServerlessMeta) IsEnabled() bool
IsEnabled returns true if the serverless mode is enabled.
func (*MockServerlessMeta) Lock ¶ added in v0.67.0
func (s *MockServerlessMeta) Lock()
Lock is a no-op for the mock serverless meta.
func (*MockServerlessMeta) SenderDoneChan ¶ added in v0.67.0
func (s *MockServerlessMeta) SenderDoneChan() chan *sync.WaitGroup
SenderDoneChan returns the channel is used to transfer wait groups from the sync_destination to the sender.
func (*MockServerlessMeta) Unlock ¶ added in v0.67.0
func (s *MockServerlessMeta) Unlock()
Unlock is a no-op for the mock serverless meta.
func (*MockServerlessMeta) WaitGroup ¶ added in v0.67.0
func (s *MockServerlessMeta) WaitGroup() *sync.WaitGroup
WaitGroup returns the wait group for the serverless mode.
type NoopSink ¶ added in v0.67.0
type NoopSink struct{}
NoopSink is a Sink implementation that does nothing This is used when there is no need to hook an auditor to the sender
type PipelineComponent ¶ added in v0.66.0
type PipelineComponent interface {
In() chan *message.Payload
PipelineMonitor() metrics.PipelineMonitor
Start()
Stop()
}
PipelineComponent abstracts a pipeline component
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender can distribute payloads on multiple underlying workers
func NewSender ¶
func NewSender( config pkgconfigmodel.Reader, sink Sink, destinationFactory DestinationFactory, bufferSize int, serverlessMeta ServerlessMeta, queueCount int, workersPerQueue int, pipelineMonitor metrics.PipelineMonitor, ) *Sender
NewSender returns a new sender.
func (*Sender) PipelineMonitor ¶ added in v0.66.0
func (s *Sender) PipelineMonitor() metrics.PipelineMonitor
PipelineMonitor returns the pipeline monitor of the sender workers.
type Serializer ¶
type Serializer interface {
Serialize(message *message.Message, writer io.Writer) error
Finish(writer io.Writer) error
Reset()
}
Serializer transforms a batch of messages into a payload. It is the one rendering the messages (i.e. either directly using raw []byte data from unstructured messages or turning structured messages into []byte data).
func NewArraySerializer ¶ added in v0.68.0
func NewArraySerializer() Serializer
NewArraySerializer creates a new arraySerializer
type ServerlessMeta ¶ added in v0.67.0
type ServerlessMeta interface {
Lock()
Unlock()
WaitGroup() *sync.WaitGroup
SenderDoneChan() chan *sync.WaitGroup
IsEnabled() bool
}
ServerlessMeta is a struct that contains essential control structures for serverless mode. Do not access any methods on this interface without checking IsEnabled first.
func NewServerlessMeta ¶ added in v0.67.0
func NewServerlessMeta(isEnabled bool) ServerlessMeta
NewServerlessMeta creates a new ServerlessMeta instance.
type Sink ¶ added in v0.67.0
Sink is the component that messages are sent to once the sender has finished processing them.
type Strategy ¶
type Strategy interface {
Start()
Stop()
}
Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline. In the logs pipeline, the strategy implementation should convert a stream of incoming Messages to a stream of Payloads that the sender can handle. A strategy is startable and stoppable so that the pipeline can manage it's lifecycle.
func NewBatchStrategy ¶
func NewBatchStrategy( inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, serverlessMeta ServerlessMeta, batchWait time.Duration, maxBatchSize int, maxContentSize int, pipelineName string, compression compression.Compressor, pipelineMonitor metrics.PipelineMonitor, instanceID string, ) Strategy
NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits
func NewStreamStrategy ¶
func NewStreamStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, compression compression.Compressor) Strategy
NewStreamStrategy creates a new stream strategy