Documentation
¶
Index ¶
- Constants
- func ExponentialDuration(attempt int) time.Duration
- func NewHTTPSBatchWriter(binding *URLBinding, netConf NetworkTimeoutConfig, tlsConf *tls.Config, ...) egress.WriteCloser
- func NewHTTPSWriter(binding *URLBinding, netConf NetworkTimeoutConfig, tlsConf *tls.Config, ...) egress.WriteCloser
- func NewRetryWriter(urlBinding *URLBinding, retryDuration RetryDuration, maxRetries int, ...) (egress.WriteCloser, error)
- func NewTCPWriter(binding *URLBinding, netConf NetworkTimeoutConfig, ...) egress.WriteCloser
- func NewTLSWriter(binding *URLBinding, netConf NetworkTimeoutConfig, tlsConf *tls.Config, ...) egress.WriteCloser
- func NewWriterFactoryErrorf(u *url.URL, format string, a ...any) error
- func WithParallelRetries(n int)
- type Binding
- type ConnectorOption
- type Converter
- type ConverterOption
- type Credentials
- type DialFunc
- type Drain
- type DrainData
- type EnvelopeWriter
- type FilteringDrainWriter
- type HTTPSBatchWriter
- type HTTPSWriter
- type InternalRetryWriter
- type LogClient
- type NetworkTimeoutConfig
- type Option
- type RetryCoordinator
- type RetryDuration
- type RetryWriter
- type Retryer
- type SyslogConnector
- type TCPWriter
- type TLSWriter
- type URLBinding
- type WriterFactory
- type WriterFactoryError
Constants ¶
const RFC5424TimeOffsetNum = "2006-01-02T15:04:05.999999-07:00"
Variables ¶
This section is empty.
Functions ¶
func ExponentialDuration ¶
ExponentialDuration returns a duration that grows exponentially with each attempt. It is maxed out at 15 seconds.
func NewHTTPSBatchWriter ¶
func NewHTTPSBatchWriter( binding *URLBinding, netConf NetworkTimeoutConfig, tlsConf *tls.Config, egressMetric metrics.Counter, c *Converter, options ...Option, ) egress.WriteCloser
HTTPSBatchWriter is an egress.WriteCloser implementation that batches syslog messages and sends them via HTTPS in configurable batch sizes and intervals. It provides backpressure to upstream callers by using a blocking channel for incoming messages. Failed batch sends are retried according to a configurable retry policy, using a global RetryCoordinator to limit the number of concurrent retries across all drains. This prevents resource exhaustion and noisy neighbor issues, ensuring reliable and efficient delivery of batched syslog messages.
func NewHTTPSWriter ¶
func NewHTTPSWriter( binding *URLBinding, netConf NetworkTimeoutConfig, tlsConf *tls.Config, egressMetric metrics.Counter, c *Converter, ) egress.WriteCloser
func NewRetryWriter ¶
func NewRetryWriter( urlBinding *URLBinding, retryDuration RetryDuration, maxRetries int, writer egress.WriteCloser, ) (egress.WriteCloser, error)
func NewTCPWriter ¶
func NewTCPWriter( binding *URLBinding, netConf NetworkTimeoutConfig, egressMetric metrics.Counter, c *Converter, ) egress.WriteCloser
NewTCPWriter creates a new TCP syslog writer.
func NewTLSWriter ¶
func NewTLSWriter( binding *URLBinding, netConf NetworkTimeoutConfig, tlsConf *tls.Config, egressMetric metrics.Counter, syslogConverter *Converter, ) egress.WriteCloser
func NewWriterFactoryErrorf ¶
Types ¶
type ConnectorOption ¶
type ConnectorOption func(*SyslogConnector)
ConnectorOption allows a syslog connector to be customized.
func WithLogClient ¶
func WithLogClient(logClient LogClient, sourceIndex string) ConnectorOption
WithLogClient returns a ConnectorOption that will set up logging for any information about a binding.
type Converter ¶
type Converter struct {
// contains filtered or unexported fields
}
func NewConverter ¶
func NewConverter(opts ...ConverterOption) *Converter
func (*Converter) BuildHostname ¶
func (c *Converter) BuildHostname(env *loggregator_v2.Envelope, defaultHostname string) string
type ConverterOption ¶
type ConverterOption func(*Converter)
func WithoutSyslogMetadata ¶
func WithoutSyslogMetadata() ConverterOption
type Credentials ¶
type Drain ¶
type Drain struct { Url string `json:"url"` Credentials Credentials `json:"credentials"` }
type EnvelopeWriter ¶
type EnvelopeWriter struct {
// contains filtered or unexported fields
}
func NewEnvelopeWriter ¶
func NewEnvelopeWriter(drainGetter drainGetter, nextEnvelope nextEnvelope, ingress metrics.Counter, log *log.Logger) *EnvelopeWriter
func (*EnvelopeWriter) Run ¶
func (w *EnvelopeWriter) Run()
type FilteringDrainWriter ¶
type FilteringDrainWriter struct {
// contains filtered or unexported fields
}
func NewFilteringDrainWriter ¶
func NewFilteringDrainWriter(binding Binding, writer egress.Writer) (*FilteringDrainWriter, error)
func (*FilteringDrainWriter) Write ¶
func (w *FilteringDrainWriter) Write(env *loggregator_v2.Envelope) error
type HTTPSBatchWriter ¶
type HTTPSBatchWriter struct { HTTPSWriter // contains filtered or unexported fields }
func (*HTTPSBatchWriter) Close ¶
func (w *HTTPSBatchWriter) Close() error
func (*HTTPSBatchWriter) ConfigureRetry ¶
func (w *HTTPSBatchWriter) ConfigureRetry(retryDuration RetryDuration, maxRetries int)
Also Marks that HTTPSBatchWriter implements the InternalRetryWriter interface
func (*HTTPSBatchWriter) Write ¶
func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error
type HTTPSWriter ¶
type HTTPSWriter struct {
// contains filtered or unexported fields
}
func (*HTTPSWriter) Close ¶
func (*HTTPSWriter) Close() error
func (*HTTPSWriter) Write ¶
func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error
type InternalRetryWriter ¶
type InternalRetryWriter interface {
ConfigureRetry(retryDuration RetryDuration, maxRetries int)
}
InternalRetryWriter is an interface that defines methods for configuring retry behavior for syslog writers. It allows setting a retry duration function and the maximum number of retries.
type LogClient ¶
type LogClient interface {
EmitLog(message string, opts ...loggregator.EmitLogOption)
}
LogClient is used to emit logs.
type NetworkTimeoutConfig ¶
type NetworkTimeoutConfig struct { Keepalive time.Duration DialTimeout time.Duration WriteTimeout time.Duration }
NetworkTimeoutConfig stores various timeout values.
type Option ¶
type Option func(*HTTPSBatchWriter)
func WithBatchSize ¶
testing override for batch size and send interval
func WithSendInterval ¶
type RetryCoordinator ¶
type RetryCoordinator struct {
// contains filtered or unexported fields
}
RetryCoordinator provides global concurrency control for retry operations across all drain writers. It uses a semaphore pattern to enforce a configurable limit on the number of concurrent retries, preventing resource exhaustion and noisy-neighbor problems during periods of high retry load. When all retry slots are in use, additional retries will wait until a slot becomes available, ensuring the system remains stable while still attempting delivery of all messages. This coordinator will be used as a singleton.
func GetGlobalRetryCoordinator ¶
func GetGlobalRetryCoordinator() *RetryCoordinator
GetGlobalRetryCoordinator returns a singleton instance of RetryCoordinator. It initializes the coordinator with a semaphore that limits the number of concurrent retries.
func (*RetryCoordinator) Acquire ¶
func (c *RetryCoordinator) Acquire(redactedURLString string, appId string)
func (*RetryCoordinator) Release ¶
func (c *RetryCoordinator) Release()
type RetryDuration ¶
RetryDuration calculates a duration based on the number of write attempts.
type RetryWriter ¶
type RetryWriter struct { Writer egress.WriteCloser //public to allow testing // contains filtered or unexported fields }
RetryWriter wraps a WriteCloser and will retry writes if the first fails.
func (*RetryWriter) Close ¶
func (r *RetryWriter) Close() error
Close delegates to the syslog writer.
func (*RetryWriter) Write ¶
func (r *RetryWriter) Write(e *loggregator_v2.Envelope) error
Write will retry writes unitl maxRetries has been reached.
type Retryer ¶
type Retryer struct {
// contains filtered or unexported fields
}
func NewRetryer ¶
func NewRetryer( binding *URLBinding, retryDuration RetryDuration, maxRetries int, ) *Retryer
Retryer handles retry logic for failed operations with configurable policies. It coordinates with the global RetryCoordinator to limit concurrent retries, implements exponential backoff with configurable intervals, and respects context cancellation for graceful shutdown. The first attempt is always performed without acquiring a retry slot (fast path), while subsequent retries are subject to global concurrency limits.
type SyslogConnector ¶
type SyslogConnector struct {
// contains filtered or unexported fields
}
SyslogConnector creates the various egress syslog writers.
func NewSyslogConnector ¶
func NewSyslogConnector( skipCertVerify bool, wg egress.WaitGroup, f writerFactory, m metricClient, opts ...ConnectorOption, ) *SyslogConnector
NewSyslogConnector configures and returns a new SyslogConnector.
type TCPWriter ¶
type TCPWriter struct {
// contains filtered or unexported fields
}
TCPWriter represents a syslog writer that connects over unencrypted TCP. This writer is not meant to be used from multiple goroutines. The same goroutine that calls `.Write()` should be the one that calls `.Close()`.
type TLSWriter ¶
type TLSWriter struct {
TCPWriter
}
TLSWriter represents a syslog writer that connects over unencrypted TCP.
type URLBinding ¶
type URLBinding struct { Context context.Context AppID string Hostname string OmitMetadata bool InternalTls bool URL *url.URL PrivateKey []byte Certificate []byte CA []byte }
application is identified by AppID and Hostname. The syslog URL is identified by URL.
func (*URLBinding) Scheme ¶
func (u *URLBinding) Scheme() string
Scheme is a convenience wrapper around the *url.URL Scheme field
type WriterFactory ¶
type WriterFactory struct {
// contains filtered or unexported fields
}
func NewWriterFactory ¶
func NewWriterFactory(internalTlsConfig *tls.Config, externalTlsConfig *tls.Config, netConf NetworkTimeoutConfig, m metricClient) WriterFactory
func (WriterFactory) NewWriter ¶
func (f WriterFactory) NewWriter(ub *URLBinding) (egress.WriteCloser, error)
type WriterFactoryError ¶
func (WriterFactoryError) Error ¶
func (e WriterFactoryError) Error() string