es

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package es contains all the components required to communicate with Elasticsearch.

The main component is the es.Indexer which takes the messages sent over a channel , bulks them together before sending them as a bulk to the ES Bulk API endpoint.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RetryBackoffFunc

func RetryBackoffFunc(
	minimumBackoff time.Duration,
	logger zerolog.Logger,
	metrics *metrics.Metrics,
) func(attempt int) time.Duration

RetryBackoffFunc returns a function for use as a retry backoff function, it uses an exponential backoff and increments the retry metrics.

Types

type Indexer

type Indexer struct {
	// contains filtered or unexported fields
}

func NewIndexer

func NewIndexer(
	endpoint, port string,
	msgPool *bytebufferpool.Pool,
	metrics indexerMetrics,
	logger zerolog.Logger,
	opts ...IndexerOption,
) (*Indexer, error)

NewIndexer creates a new Indexer instance with the provided client.

func (*Indexer) CheckESStatus

func (i *Indexer) CheckESStatus() error

func (*Indexer) Start

func (i *Indexer) Start(
	consumerChan <-chan *bytebufferpool.ByteBuffer,
	errorChan chan<- *bytebufferpool.ByteBuffer,
) error

Start starts the indexing of the messages coming from the sending channel.

If the inbound/sending channel is closed the indexer shutsdown.

This should be run in a Goroutine, preferably using the golang.org/x/sync/errgroup for grouping all the concurrent Goroutines.

type IndexerOption

type IndexerOption func(ic *indexerConfig)

IndexerOption is the option type for setting Elastic client configuration values. See https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis for more information.

func WithBulkTimeout

func WithBulkTimeout(timeout time.Duration) IndexerOption

WithBulkTimeout sets the timeout duration of the bulk calls.

func WithCustomLogger

func WithCustomLogger(logger zerolog.Logger) IndexerOption

WithCustomLogger sets a logger that implements "estransport.Logger" interface on the client.

func WithDiscoverNodesInterval

func WithDiscoverNodesInterval(discoverInterval time.Duration) IndexerOption

func WithDiscoverNodesOnStart

func WithDiscoverNodesOnStart(enabled bool) IndexerOption

func WithESClientMetrics

func WithESClientMetrics(collectorFunc func(*elasticsearch.Client) prometheus.Collector, promReg *prometheus.Registry) IndexerOption

WithESClientMetrics enables the client metrics and registers a prometheus collector with the default registry.

func WithFlushInterval

func WithFlushInterval(interval time.Duration) IndexerOption

WithFlushInterval sets the interval between flushes to ES.

func WithMaxBufferedBytes

func WithMaxBufferedBytes(maxBytes int) IndexerOption

WithMaxBufferedBytes sets the maximum amount of bytes to be buffered before flushing to ES.

func WithMaxBufferedMsgs

func WithMaxBufferedMsgs(maxMsgs int) IndexerOption

WithMaxBufferedMsgs sets the maximum amount of msgs to be buffered before flushing to ES.

func WithMaxDeadPercentage

func WithMaxDeadPercentage(maxPercentage int) IndexerOption

WithMaxDeadPercentage sets the maxmimum allowed percentage of dead connections before reloading the connection pool.

func WithMaxRetries

func WithMaxRetries(retries int) IndexerOption

WithMaxRetries allows specifying the maximum number of retries to attempt.

func WithRetryBackoffFunc

func WithRetryBackoffFunc(retryBackoffFunc func(attempt int) time.Duration) IndexerOption

WithRetryBackoffFunc sets the retry backoff function, that takes the current attempt count and returns a backoff duration. The retry backoff func is also the best place to add retry metrics.

func WithRetryOnTimeout

func WithRetryOnTimeout() IndexerOption

WithRetryOnTimeout enables retries on connection timeouts.

func WithRetryStatuses

func WithRetryStatuses(statuses ...int) IndexerOption

WithRetryStatuses sets the HTTP Status Codes that should be retried.

func WithTransport

func WithTransport(transport http.RoundTripper) IndexerOption

WithTransport allows setting a custom http.RoundTripper instance.

type Logger

type Logger struct {
	zerolog.Logger
}

Logger implements the estransport.Logger interface.

func (*Logger) LogRoundTrip

func (l *Logger) LogRoundTrip(
	req *http.Request,
	res *http.Response,
	err error,
	start time.Time,
	dur time.Duration,
) error

LogRoundTrip prints the information about request and response.

func (*Logger) RequestBodyEnabled

func (l *Logger) RequestBodyEnabled() bool

RequestBodyEnabled makes the client pass request body to logger.

func (*Logger) ResponseBodyEnabled

func (l *Logger) ResponseBodyEnabled() bool

RequestBodyEnabled makes the client pass response body to logger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL