Published: Jul 7, 2020 | License: MIT


type Config

type Config struct {
	IndexerConcurrency       dynamicconfig.IntPropertyFn
	ESProcessorNumOfWorkers  dynamicconfig.IntPropertyFn
	ESProcessorBulkActions   dynamicconfig.IntPropertyFn // max number of requests in bulk
	ESProcessorBulkSize      dynamicconfig.IntPropertyFn // max total size of bytes in bulk
	ESProcessorFlushInterval dynamicconfig.DurationPropertyFn
	ValidSearchAttributes    dynamicconfig.MapPropertyFn

Config contains all configs for indexer

type ESProcessor

type ESProcessor interface {
	// Stop processor and clean up
	// Add request to bulk, and record kafka message in map with provided key
	// This call will be blocked when downstream has issues
	Add(request elastic.BulkableRequest, key string, kafkaMsg messaging.Message)

ESProcessor is interface for elastic search bulk processor

func NewESProcessorAndStart

func NewESProcessorAndStart(config *Config, client es.Client, processorName string,
	logger log.Logger, metricsClient metrics.Client, msgEncoder codec.BinaryEncoder) (ESProcessor, error)

NewESProcessorAndStart create new ESProcessor and start

type ElasticBulkProcessor

type ElasticBulkProcessor interface {
	Start(ctx context.Context) error
	Stop() error
	Close() error
	Stats() elastic.BulkProcessorStats
	Add(request elastic.BulkableRequest)
	Flush() error

ElasticBulkProcessor is interface for elastic.BulkProcessor (elastic package doesn't provide such interface that tests can mock)

type Indexer

type Indexer struct {
	// contains filtered or unexported fields

Indexer used to consumer data from kafka then send to ElasticSearch

func NewIndexer

func NewIndexer(config *Config, client messaging.Client, esClient es.Client, esConfig *es.Config,
	logger log.Logger, metricsClient metrics.Client) *Indexer

NewIndexer create a new Indexer

func (*Indexer) Start

func (x *Indexer) Start() error

Start indexer

func (*Indexer) Stop

func (x *Indexer) Stop()

Stop indexer

