sender

package module
v0.75.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: Apache-2.0 Imports: 15 Imported by: 9

Documentation

Overview

Package sender provides log message sending functionality

Package sender provides log message sending functionality

Index

Constants

View Source
const (
	// DefaultWorkersPerQueue - By default most pipelines will only require a single sender worker, as the single worker itself can
	// concurrently transmit multiple http requests at once. This value is not intended to be configurable, but legacy
	// usages of the sender will override this value where necessary. If there is a desire to edit the concurrency of the senders
	// via config, see the BatchMaxConcurrentSend endpoint setting.
	DefaultWorkersPerQueue = 1

	// DefaultQueuesCount - By default most pipelines will only require a single queue, as the single queue itself can
	// concurrently transmit multiple http requests at once. Systems forced in to a legacy mode will override this value.
	DefaultQueuesCount = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type DestinationFactory added in v0.66.0

type DestinationFactory func(id string) *client.Destinations

DestinationFactory used to generate client destinations on each call.

type DestinationSender

type DestinationSender struct {
	// contains filtered or unexported fields
}

DestinationSender wraps a destination to send messages blocking on a full buffer, but not blocking when a destination is retrying

func NewDestinationSender

func NewDestinationSender(config pkgconfigmodel.Reader, destination client.Destination, output chan *message.Payload, bufferSize int) *DestinationSender

NewDestinationSender creates a new DestinationSender

func (*DestinationSender) NonBlockingSend

func (d *DestinationSender) NonBlockingSend(payload *message.Payload) bool

NonBlockingSend tries to send the payload and fails silently if the input is full. returns false if the buffer is full - true if successful.

func (*DestinationSender) Send

func (d *DestinationSender) Send(payload *message.Payload) bool

Send sends a payload and blocks if the input is full. It will not block if the destination is retrying payloads and will cancel the blocking attempt if the retry state changes

func (*DestinationSender) Stop

func (d *DestinationSender) Stop()

Stop stops the DestinationSender

type MessageBuffer

type MessageBuffer struct {
	// contains filtered or unexported fields
}

MessageBuffer accumulates message metadata to a buffer until the max capacity is reached.

func NewMessageBuffer

func NewMessageBuffer(batchSizeLimit int, contentSizeLimit int) *MessageBuffer

NewMessageBuffer returns a new MessageBuffer.

func (*MessageBuffer) AddMessage

func (p *MessageBuffer) AddMessage(message *message.Message) bool

AddMessage adds a message to the buffer if there is still some free space, returns true if the message was added.

func (*MessageBuffer) Clear

func (p *MessageBuffer) Clear()

Clear reinitializes the buffer.

func (*MessageBuffer) ContentSizeLimit

func (p *MessageBuffer) ContentSizeLimit() int

ContentSizeLimit returns the configured content size limit. Messages above this limit are not accepted.

func (*MessageBuffer) GetMessages

func (p *MessageBuffer) GetMessages() []*message.MessageMetadata

GetMessages returns the messages stored in the buffer.

func (*MessageBuffer) IsEmpty

func (p *MessageBuffer) IsEmpty() bool

IsEmpty returns true if the buffer is empty.

func (*MessageBuffer) IsFull

func (p *MessageBuffer) IsFull() bool

IsFull returns true if the buffer is full.

type Mock added in v0.66.0

type Mock struct {
	// contains filtered or unexported fields
}

Mock represents a mocked sender that fulfills the pipeline component interface

func NewMockSender added in v0.66.0

func NewMockSender() *Mock

NewMockSender generates a mock sender

func (*Mock) In added in v0.66.0

func (s *Mock) In() chan *message.Payload

In returns a self-emptying chan

func (*Mock) PipelineMonitor added in v0.66.0

func (s *Mock) PipelineMonitor() metrics.PipelineMonitor

PipelineMonitor returns an instance of NoopPipelineMonitor

func (*Mock) Start added in v0.66.0

func (s *Mock) Start()

Start begins the routine that empties the In channel

func (*Mock) Stop added in v0.66.0

func (s *Mock) Stop()

Stop closes the in channel

type MockServerlessMeta added in v0.67.0

type MockServerlessMeta struct {
	// contains filtered or unexported fields
}

MockServerlessMeta is a struct that contains essential control structures for serverless mode. Do not access any methods on this struct without checking IsEnabled first.

func NewMockServerlessMeta added in v0.67.0

func NewMockServerlessMeta(isEnabled bool) *MockServerlessMeta

NewMockServerlessMeta returns a new MockServerlessMeta

func (*MockServerlessMeta) IsEnabled added in v0.67.0

func (s *MockServerlessMeta) IsEnabled() bool

IsEnabled returns true if the serverless mode is enabled.

func (*MockServerlessMeta) Lock added in v0.67.0

func (s *MockServerlessMeta) Lock()

Lock is a no-op for the mock serverless meta.

func (*MockServerlessMeta) SenderDoneChan added in v0.67.0

func (s *MockServerlessMeta) SenderDoneChan() chan *sync.WaitGroup

SenderDoneChan returns the channel is used to transfer wait groups from the sync_destination to the sender.

func (*MockServerlessMeta) Unlock added in v0.67.0

func (s *MockServerlessMeta) Unlock()

Unlock is a no-op for the mock serverless meta.

func (*MockServerlessMeta) WaitGroup added in v0.67.0

func (s *MockServerlessMeta) WaitGroup() *sync.WaitGroup

WaitGroup returns the wait group for the serverless mode.

type NoopSink added in v0.67.0

type NoopSink struct{}

NoopSink is a Sink implementation that does nothing This is used when there is no need to hook an auditor to the sender

func (*NoopSink) Channel added in v0.67.0

func (t *NoopSink) Channel() chan *message.Payload

Channel returns a nil channel

type PipelineComponent added in v0.66.0

type PipelineComponent interface {
	In() chan *message.Payload
	PipelineMonitor() metrics.PipelineMonitor
	Start()
	Stop()
}

PipelineComponent abstracts a pipeline component

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender can distribute payloads on multiple underlying workers

func NewSender

func NewSender(
	config pkgconfigmodel.Reader,
	sink Sink,
	destinationFactory DestinationFactory,
	bufferSize int,
	serverlessMeta ServerlessMeta,
	queueCount int,
	workersPerQueue int,
	pipelineMonitor metrics.PipelineMonitor,
) *Sender

NewSender returns a new sender.

func (*Sender) In added in v0.66.0

func (s *Sender) In() chan *message.Payload

In is the input channel of a worker set.

func (*Sender) PipelineMonitor added in v0.66.0

func (s *Sender) PipelineMonitor() metrics.PipelineMonitor

PipelineMonitor returns the pipeline monitor of the sender workers.

func (*Sender) Start

func (s *Sender) Start()

Start starts all sender workers.

func (*Sender) Stop

func (s *Sender) Stop()

Stop stops all sender workers

type Serializer

type Serializer interface {
	Serialize(message *message.Message, writer io.Writer) error
	Finish(writer io.Writer) error
	Reset()
}

Serializer transforms a batch of messages into a payload. It is the one rendering the messages (i.e. either directly using raw []byte data from unstructured messages or turning structured messages into []byte data).

func NewArraySerializer added in v0.68.0

func NewArraySerializer() Serializer

NewArraySerializer creates a new arraySerializer

type ServerlessMeta added in v0.67.0

type ServerlessMeta interface {
	Lock()
	Unlock()
	WaitGroup() *sync.WaitGroup
	SenderDoneChan() chan *sync.WaitGroup
	IsEnabled() bool
}

ServerlessMeta is a struct that contains essential control structures for serverless mode. Do not access any methods on this interface without checking IsEnabled first.

func NewServerlessMeta added in v0.67.0

func NewServerlessMeta(isEnabled bool) ServerlessMeta

NewServerlessMeta creates a new ServerlessMeta instance.

type Sink added in v0.67.0

type Sink interface {
	Channel() chan *message.Payload
}

Sink is the component that messages are sent to once the sender has finished processing them.

type Strategy

type Strategy interface {
	Start()
	Stop()
}

Strategy should contain all logic to send logs to a remote destination and forward them the next stage of the pipeline. In the logs pipeline, the strategy implementation should convert a stream of incoming Messages to a stream of Payloads that the sender can handle. A strategy is startable and stoppable so that the pipeline can manage it's lifecycle.

func NewBatchStrategy

func NewBatchStrategy(
	inputChan chan *message.Message,
	outputChan chan *message.Payload,
	flushChan chan struct{},
	serverlessMeta ServerlessMeta,
	batchWait time.Duration,
	maxBatchSize int,
	maxContentSize int,
	pipelineName string,
	compression compression.Compressor,
	pipelineMonitor metrics.PipelineMonitor,
	instanceID string,
) Strategy

NewBatchStrategy returns a new batch concurrent strategy with the specified batch & content size limits

func NewStreamStrategy

func NewStreamStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, compression compression.Compressor) Strategy

NewStreamStrategy creates a new stream strategy

Directories

Path Synopsis
Package http manages creation of http-based senders
Package http manages creation of http-based senders
Package tcp manages creation of tcp-based senders
Package tcp manages creation of tcp-based senders

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL