esutil

package
v7.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2020 License: Apache-2.0 Imports: 13 Imported by: 119

Documentation

Overview

Package esutil provides helper utilities to the Go client for Elasticsearch.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewJSONReader

func NewJSONReader(v interface{}) io.Reader

NewJSONReader encodes v into JSON and returns it as an io.Reader.

Types

type BulkIndexer added in v7.7.0

type BulkIndexer interface {
	// Add adds an item to the indexer. It returns an error when the item cannot be added.
	// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
	//
	// You must call the Close() method after you're done adding items.
	//
	// It is safe for concurrent use. When it's called from goroutines,
	// they must finish before the call to Close, eg. using sync.WaitGroup.
	Add(context.Context, BulkIndexerItem) error

	// Close waits until all added items are flushed and closes the indexer.
	Close(context.Context) error

	// Stats returns indexer statistics.
	Stats() BulkIndexerStats
}

BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.

func NewBulkIndexer added in v7.7.0

func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error)

NewBulkIndexer creates a new bulk indexer.

Example
log.SetFlags(0)

// Create the Elasticsearch client
//
es, err := elasticsearch.NewClient(elasticsearch.Config{
	// Retry on 429 TooManyRequests statuses
	//
	RetryOnStatus: []int{502, 503, 504, 429},

	// A simple incremental backoff function
	//
	RetryBackoff: func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond },

	// Retry up to 5 attempts
	//
	MaxRetries: 5,
})
if err != nil {
	log.Fatalf("Error creating the client: %s", err)
}

// Create the indexer
//
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
	Client:     es,     // The Elasticsearch client
	Index:      "test", // The default index name
	NumWorkers: 4,      // The number of worker goroutines (default: number of CPUs)
	FlushBytes: 5e+6,   // The flush threshold in bytes (default: 5M)
})
if err != nil {
	log.Fatalf("Error creating the indexer: %s", err)
}

// Add an item to the indexer
//
err = indexer.Add(
	context.Background(),
	esutil.BulkIndexerItem{
		// Action field configures the operation to perform (index, create, delete, update)
		Action: "index",

		// DocumentID is the optional document ID
		DocumentID: "1",

		// Body is an `io.Reader` with the payload
		Body: strings.NewReader(`{"title":"Test"}`),

		// OnSuccess is the optional callback for each successful operation
		OnSuccess: func(
			ctx context.Context,
			item esutil.BulkIndexerItem,
			res esutil.BulkIndexerResponseItem,
		) {
			fmt.Printf("[%d] %s test/%s", res.Status, res.Result, item.DocumentID)
		},

		// OnFailure is the optional callback for each failed operation
		OnFailure: func(
			ctx context.Context,
			item esutil.BulkIndexerItem,
			res esutil.BulkIndexerResponseItem, err error,
		) {
			if err != nil {
				log.Printf("ERROR: %s", err)
			} else {
				log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
			}
		},
	},
)
if err != nil {
	log.Fatalf("Unexpected error: %s", err)
}

// Close the indexer channel and flush remaining items
//
if err := indexer.Close(context.Background()); err != nil {
	log.Fatalf("Unexpected error: %s", err)
}

// Report the indexer statistics
//
stats := indexer.Stats()
if stats.NumFailed > 0 {
	log.Fatalf("Indexed [%d] documents with [%d] errors", stats.NumFlushed, stats.NumFailed)
} else {
	log.Printf("Successfully indexed [%d] documents", stats.NumFlushed)
}

// For optimal performance, consider using a third-party package for JSON decoding and HTTP transport.
//
// For more information, examples and benchmarks, see:
//
// --> https://github.com/elastic/go-elasticsearch/tree/master/_examples/bulk
Output:

type BulkIndexerConfig added in v7.7.0

type BulkIndexerConfig struct {
	NumWorkers    int           // The number of workers. Defaults to runtime.NumCPU().
	FlushBytes    int           // The flush threshold in bytes. Defaults to 5MB.
	FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.

	Client      *elasticsearch.Client   // The Elasticsearch client.
	Decoder     BulkResponseJSONDecoder // A custom JSON decoder.
	DebugLogger BulkIndexerDebugLogger  // An optional logger for debugging.

	OnError      func(context.Context, error)          // Called for indexer errors.
	OnFlushStart func(context.Context) context.Context // Called when the flush starts.
	OnFlushEnd   func(context.Context)                 // Called when the flush ends.

	// Parameters of the Bulk API.
	Index               string
	ErrorTrace          bool
	FilterPath          []string
	Header              http.Header
	Human               bool
	Pipeline            string
	Pretty              bool
	Refresh             string
	Routing             string
	Source              []string
	SourceExcludes      []string
	SourceIncludes      []string
	Timeout             time.Duration
	WaitForActiveShards string
}

BulkIndexerConfig represents configuration of the indexer.

type BulkIndexerDebugLogger added in v7.7.0

type BulkIndexerDebugLogger interface {
	Printf(string, ...interface{})
}

BulkIndexerDebugLogger defines the interface for a debugging logger.

type BulkIndexerItem added in v7.7.0

type BulkIndexerItem struct {
	Index           string
	Action          string
	DocumentID      string
	Body            io.Reader
	RetryOnConflict *int

	OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem)        // Per item
	OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
}

BulkIndexerItem represents an indexer item.

type BulkIndexerResponse added in v7.7.0

type BulkIndexerResponse struct {
	Took      int                                  `json:"took"`
	HasErrors bool                                 `json:"errors"`
	Items     []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
}

BulkIndexerResponse represents the Elasticsearch response.

type BulkIndexerResponseItem added in v7.7.0

type BulkIndexerResponseItem struct {
	Index      string `json:"_index"`
	DocumentID string `json:"_id"`
	Version    int64  `json:"_version"`
	Result     string `json:"result"`
	Status     int    `json:"status"`
	SeqNo      int64  `json:"_seq_no"`
	PrimTerm   int64  `json:"_primary_term"`

	Shards struct {
		Total      int `json:"total"`
		Successful int `json:"successful"`
		Failed     int `json:"failed"`
	} `json:"_shards"`

	Error struct {
		Type   string `json:"type"`
		Reason string `json:"reason"`
		Cause  struct {
			Type   string `json:"type"`
			Reason string `json:"reason"`
		} `json:"caused_by"`
	} `json:"error,omitempty"`
}

BulkIndexerResponseItem represents the Elasticsearch response item.

type BulkIndexerStats added in v7.7.0

type BulkIndexerStats struct {
	NumAdded    uint64
	NumFlushed  uint64
	NumFailed   uint64
	NumIndexed  uint64
	NumCreated  uint64
	NumUpdated  uint64
	NumDeleted  uint64
	NumRequests uint64
}

BulkIndexerStats represents the indexer statistics.

type BulkResponseJSONDecoder added in v7.7.0

type BulkResponseJSONDecoder interface {
	UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
}

BulkResponseJSONDecoder defines the interface for custom JSON decoders.

type JSONEncoder

type JSONEncoder interface {
	EncodeJSON(io.Writer) error
}

JSONEncoder defines the interface for custom JSON encoders.

type JSONReader

type JSONReader struct {
	// contains filtered or unexported fields
}

JSONReader represents a reader which takes an interface value, encodes it into JSON, and wraps it in an io.Reader.

func (*JSONReader) Read

func (r *JSONReader) Read(p []byte) (int, error)

Read implements the io.Reader interface.

func (*JSONReader) WriteTo

func (r *JSONReader) WriteTo(w io.Writer) (int64, error)

WriteTo implements the io.WriterTo interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL