esbulk

package module
v0.4.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2018 License: MIT Imports: 13 Imported by: 0

README

esbulk

Fast parallel bulk loading utility for elasticsearch. Data is read from a newline delimited JSON file or stdin and indexed into elasticsearch in bulk and in parallel. The shortest command would be:

$ esbulk -index my-index-name < file.ldj

Caveat: If indexing pressure on the bulk API is too high (dozens or hundreds of parallel workers, large batch sizes, depending on you setup), esbulk will halt and report an error:

$ esbulk -index my-index-name -w 100 file.ldj
2017/01/02 16:25:25 error during bulk operation, try less workers (lower -w value) or
                    increase thread_pool.bulk.queue_size in your nodes

Please note that, in such a case, some documents are indexed and some are not. Your index will be in an inconsistent state, since there is no transactional bracket around the indexing process.

However, using defaults (parallism: number of cores) on a single node setup will just work. For larger clusters, increase the number of workers until you see full CPU utilization. After that, more workers won't buy any more speed.

Installation

$ go get github.com/miku/esbulk/cmd/esbulk

For deb or rpm packages, see: https://github.com/miku/esbulk/releases

Usage

$ esbulk -h
  Usage of esbulk:
  -0    set the number of replicas to 0 during indexing
  -cpuprofile string
          write cpu profile to file
  -host string
          elasticsearch host (deprecated: use -server instead) (default "localhost")
  -id string
          name of field to use as id field, by default ids are autogenerated
  -index string
          index name
  -mapping string
          mapping string or filename to apply before indexing
  -memprofile string
          write heap profile to file
  -port int
          elasticsearch port (deprecated: use -server instead) (default 9200)
  -purge
          purge any existing index before indexing
  -server string
          elasticsearch server, this works with https as well (default "http://localhost:9200")
  -size int
          bulk batch size (default 1000)
  -type string
          elasticsearch doc type (default "default")
  -u string
          http basic auth username:password, like curl -u
  -v    prints current program version
  -verbose
          output basic progress
  -w int
          number of workers to use (default 4)
  -z    unzip gz'd file on the fly

To index a JSON file, that contains one document per line, just run:

$ esbulk -index example file.ldj

Where file.ldj is line delimited JSON, like:

{"name": "esbulk", "version": "0.2.4"}
{"name": "estab", "version": "0.1.3"}
...

By default esbulk will use as many parallel workers, as there are cores. To tweak the indexing process, adjust the -size and -w parameters.

You can index from gzipped files as well, using the -z flag:

$ esbulk -z -index example file.ldj.gz

Starting with 0.3.7 the preferred method to set a non-default server hostport is via -server, e.g.

$ esbulk -server https://0.0.0.0:9201

This way, you can use https as well, which was not possible before. Options -host and -port are kept for backwards compatibility.

Reusing IDs

Since version 0.3.8: If you want to reuse IDs from your documents in elasticsearch, you can specify the ID field via -id flag:

$ cat file.json
{"x": "doc-1", "db": "mysql"}
{"x": "doc-2", "db": "mongo"}

Here, we would like to reuse the ID from field x.

$ esbulk -id x -index throwaway -verbose file.json
...

$ curl -s http://localhost:9200/throwaway/_search | jq
{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "throwaway",
        "_type": "default",
        "_id": "doc-2",
        "_score": 1,
        "_source": {
          "x": "doc-2",
          "db": "mongo"
        }
      },
      {
        "_index": "throwaway",
        "_type": "default",
        "_id": "doc-1",
        "_score": 1,
        "_source": {
          "x": "doc-1",
          "db": "mysql"
        }
      }
    ]
  }
}

Nested ID fields

Version 0.4.3 adds support for nested ID fields:

$ cat fixtures/pr-8-1.json
{"a": {"b": 1}}
{"a": {"b": 2}}
{"a": {"b": 3}}

$ esbulk -index throwaway -id a.b < fixtures/pr-8-1.json
...

Concatenated ID

Version 0.4.3 adds support for IDs that are the concatenation of multiple fields:

$ cat fixtures/pr-8-2.json
{"a": {"b": 1}, "c": "a"}
{"a": {"b": 2}, "c": "b"}
{"a": {"b": 3}, "c": "c"}

$ esbulk -index throwaway -id a.b,c < fixtures/pr-8-1.json
...

      {
        "_index": "xxx",
        "_type": "default",
        "_id": "1a",
        "_score": 1,
        "_source": {
          "a": {
            "b": 1
          },
          "c": "a"
        }
      },

Using X-Pack

Since 0.4.2: support for secured elasticsearch nodes:

$ esbulk -u elastic:changeme -index myindex file.ldj

A similar project has been started for solr, called solrbulk.

Contributors

Measurements

$ csvlook -I measurements.csv
| es    | esbulk | docs      | avg_b | nodes | total_cores | total_heap_gb | t_s   | docs_per_s | repl |
|-------|--------|-----------|-------|-------|-------------|---------------|-------|------------|------|
| 6.1.2 | 0.4.8  | 138000000 | 2000  | 1     | 32          |  64           |  6420 |  22100     | 1    |
| 6.1.2 | 0.4.8  | 138000000 | 2000  | 1     |  8          |  30           | 27360 |   5100     | 1    |
| 6.1.2 | 0.4.8  |   1000000 | 2000  | 1     |  4          |   1           |   300 |   3300     | 1    |
| 6.1.2 | 0.4.8  |  10000000 |   26  | 1     |  4          |   8           |   122 |  81000     | 1    |
| 6.1.2 | 0.4.8  |  10000000 |   26  | 1     | 32          |  64           |    32 | 307000     | 1    |
| 6.2.3 | 0.4.10 | 142944530 | 2000  | 2     | 64          | 128           | 26253 |   5444     | 1    |
| 6.2.3 | 0.4.10 | 142944530 | 2000  | 2     | 64          | 128           | 11113 |  12831     | 0    |
| 6.2.3 | 0.4.13 |  15000000 | 6000  | 2     | 64          | 128           |  2460 |   6400     | 0    |

Why not add a row?

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BulkIndex added in v0.3.0

func BulkIndex(docs []string, options Options) error

BulkIndex takes a set of documents as strings and indexes them into elasticsearch.

func CreateIndex added in v0.3.5

func CreateIndex(options Options) error

CreateIndex creates a new index.

func DeleteIndex added in v0.3.5

func DeleteIndex(options Options) error

DeleteIndex removes an index.

func FlushIndex added in v0.4.14

func FlushIndex(idx int, options Options) error

func GetSettings added in v0.4.14

func GetSettings(idx int, options Options) (map[string]interface{}, error)

getSettingsRequest fetches the settings of the index.

func PutMapping added in v0.3.5

func PutMapping(options Options, body io.Reader) error

PutMapping applies a mapping from a reader.

func Worker added in v0.3.0

func Worker(id string, options Options, lines chan string, wg *sync.WaitGroup)

Worker will batch index documents that come in on the lines channel.

Types

type ArrayFlags added in v0.4.13

type ArrayFlags []string

ArrayFlags allows to store lists of flag values.

func (*ArrayFlags) Set added in v0.4.13

func (f *ArrayFlags) Set(value string) error

Set appends a value.

func (*ArrayFlags) String added in v0.4.13

func (f *ArrayFlags) String() string

type BulkResponse added in v0.4.1

type BulkResponse struct {
	Took      int    `json:"took"`
	HasErrors bool   `json:"errors"`
	Items     []Item `json:"items"`
}

BulkResponse is a response to a bulk request.

type Item added in v0.4.1

type Item struct {
	IndexAction struct {
		Index  string `json:"_index"`
		Type   string `json:"_type"`
		ID     string `json:"_id"`
		Status int    `json:"status"`
		Error  struct {
			Type      string `json:"type"`
			Reason    string `json:"reason"`
			IndexUUID string `json:"index_uuid"`
			Shard     string `json:"shard"`
			Index     string `json:"index"`
		} `json:"error"`
	} `json:"index"`
}

Item represents a bulk action.

type Options added in v0.3.0

type Options struct {
	Servers   []string
	Host      string // deprecated: Use Servers.
	Port      int    // deprecated: Use Servers.
	Index     string
	DocType   string
	BatchSize int
	Verbose   bool
	IDField   string
	Scheme    string // http or https; deprecated: Use Servers.
	Username  string
	Password  string
}

Options represents bulk indexing options.

func (*Options) SetServer added in v0.3.7

func (o *Options) SetServer(s string) error

SetServer parses out host and port for a string and sets the option values.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL