Documentation ¶
Overview ¶
Package es contains all the components required to communicate with Elasticsearch.
The main component is the es.Indexer which takes the messages sent over a channel , bulks them together before sending them as a bulk to the ES Bulk API endpoint.
Index ¶
- func RetryBackoffFunc(minimumBackoff time.Duration, logger zerolog.Logger, metrics *metrics.Metrics) func(attempt int) time.Duration
- type Indexer
- type IndexerOption
- func WithBulkTimeout(timeout time.Duration) IndexerOption
- func WithCustomLogger(logger zerolog.Logger) IndexerOption
- func WithDiscoverNodesInterval(discoverInterval time.Duration) IndexerOption
- func WithDiscoverNodesOnStart(enabled bool) IndexerOption
- func WithESClientMetrics(collectorFunc func(*elasticsearch.Client) prometheus.Collector, ...) IndexerOption
- func WithFlushInterval(interval time.Duration) IndexerOption
- func WithMaxBufferedBytes(maxBytes int) IndexerOption
- func WithMaxBufferedMsgs(maxMsgs int) IndexerOption
- func WithMaxDeadPercentage(maxPercentage int) IndexerOption
- func WithMaxRetries(retries int) IndexerOption
- func WithRetryBackoffFunc(retryBackoffFunc func(attempt int) time.Duration) IndexerOption
- func WithRetryOnTimeout() IndexerOption
- func WithRetryStatuses(statuses ...int) IndexerOption
- func WithTransport(transport http.RoundTripper) IndexerOption
- type Logger
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Indexer ¶
type Indexer struct {
// contains filtered or unexported fields
}
func NewIndexer ¶
func NewIndexer( endpoint, port string, msgPool *bytebufferpool.Pool, metrics indexerMetrics, logger zerolog.Logger, opts ...IndexerOption, ) (*Indexer, error)
NewIndexer creates a new Indexer instance with the provided client.
func (*Indexer) CheckESStatus ¶
func (*Indexer) Start ¶
func (i *Indexer) Start( consumerChan <-chan *bytebufferpool.ByteBuffer, errorChan chan<- *bytebufferpool.ByteBuffer, ) error
Start starts the indexing of the messages coming from the sending channel.
If the inbound/sending channel is closed the indexer shutsdown.
This should be run in a Goroutine, preferably using the golang.org/x/sync/errgroup for grouping all the concurrent Goroutines.
type IndexerOption ¶
type IndexerOption func(ic *indexerConfig)
IndexerOption is the option type for setting Elastic client configuration values. See https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis for more information.
func WithBulkTimeout ¶
func WithBulkTimeout(timeout time.Duration) IndexerOption
WithBulkTimeout sets the timeout duration of the bulk calls.
func WithCustomLogger ¶
func WithCustomLogger(logger zerolog.Logger) IndexerOption
WithCustomLogger sets a logger that implements "estransport.Logger" interface on the client.
func WithDiscoverNodesInterval ¶
func WithDiscoverNodesInterval(discoverInterval time.Duration) IndexerOption
func WithDiscoverNodesOnStart ¶
func WithDiscoverNodesOnStart(enabled bool) IndexerOption
func WithESClientMetrics ¶
func WithESClientMetrics(collectorFunc func(*elasticsearch.Client) prometheus.Collector, promReg *prometheus.Registry) IndexerOption
WithESClientMetrics enables the client metrics and registers a prometheus collector with the default registry.
func WithFlushInterval ¶
func WithFlushInterval(interval time.Duration) IndexerOption
WithFlushInterval sets the interval between flushes to ES.
func WithMaxBufferedBytes ¶
func WithMaxBufferedBytes(maxBytes int) IndexerOption
WithMaxBufferedBytes sets the maximum amount of bytes to be buffered before flushing to ES.
func WithMaxBufferedMsgs ¶
func WithMaxBufferedMsgs(maxMsgs int) IndexerOption
WithMaxBufferedMsgs sets the maximum amount of msgs to be buffered before flushing to ES.
func WithMaxDeadPercentage ¶
func WithMaxDeadPercentage(maxPercentage int) IndexerOption
WithMaxDeadPercentage sets the maxmimum allowed percentage of dead connections before reloading the connection pool.
func WithMaxRetries ¶
func WithMaxRetries(retries int) IndexerOption
WithMaxRetries allows specifying the maximum number of retries to attempt.
func WithRetryBackoffFunc ¶
func WithRetryBackoffFunc(retryBackoffFunc func(attempt int) time.Duration) IndexerOption
WithRetryBackoffFunc sets the retry backoff function, that takes the current attempt count and returns a backoff duration. The retry backoff func is also the best place to add retry metrics.
func WithRetryOnTimeout ¶
func WithRetryOnTimeout() IndexerOption
WithRetryOnTimeout enables retries on connection timeouts.
func WithRetryStatuses ¶
func WithRetryStatuses(statuses ...int) IndexerOption
WithRetryStatuses sets the HTTP Status Codes that should be retried.
func WithTransport ¶
func WithTransport(transport http.RoundTripper) IndexerOption
WithTransport allows setting a custom http.RoundTripper instance.
type Logger ¶
Logger implements the estransport.Logger interface.
func (*Logger) LogRoundTrip ¶
func (l *Logger) LogRoundTrip( req *http.Request, res *http.Response, err error, start time.Time, dur time.Duration, ) error
LogRoundTrip prints the information about request and response.
func (*Logger) RequestBodyEnabled ¶
RequestBodyEnabled makes the client pass request body to logger.
func (*Logger) ResponseBodyEnabled ¶
RequestBodyEnabled makes the client pass response body to logger.