core

package
v0.0.0-...-000ccc6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2014 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// Max buffer size in bytes before flushing to elasticsearch
	BulkMaxBuffer = 1048576
	// Max number of Docs to hold in buffer before forcing flush
	BulkMaxDocs = 100
	// Max delay before forcing a flush to Elasticearch
	BulkDelaySeconds = 5
	// Keep a running total of errors seen, since it is in the background
	BulkErrorCt uint64
	// maximum wait shutdown seconds
	MAX_SHUTDOWN_SECS = 5

	// There is one Global Bulk Indexor for convenience
	GlobalBulkIndexor *BulkIndexor
)
View Source
var (
	DebugRequests = false
)

Functions

func BulkIndexorGlobalRun

func BulkIndexorGlobalRun(maxConns int, done chan bool)

There is one global bulk indexor available for convenience so the IndexBulk() function can be called. However, the recommended usage is create your own BulkIndexor to allow for multiple seperate elasticsearch servers/host connections.

 @maxConns is the max number of in flight http requests
 @done is a channel to cause the indexor to stop

done := make(chan bool)
BulkIndexorGlobalRun(100, done)

func BulkSend

func BulkSend(buf *bytes.Buffer) error

This does the actual send of a buffer, which has already been formatted into bytes of ES formatted bulk data

func Delete

func Delete(pretty bool, index string, _type string, id string, version int, routing string) (api.BaseResponse, error)

Delete API allows to delete a typed JSON document from a specific index based on its id. http://www.elasticsearch.org/guide/reference/api/delete.html todo: add routing and versioning support

func DeleteByQuery

func DeleteByQuery(pretty bool, indices []string, types []string, query interface{}) (api.BaseResponse, error)

DeleteByQuery allows the caller to delete documents from one or more indices and one or more types based on a query. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. see: http://www.elasticsearch.org/guide/reference/api/delete-by-query.html

func Exists

func Exists(pretty bool, index string, _type string, id string) (bool, error)

Exists allows caller to check for the existance of a document using HEAD

func Explain

func Explain(pretty bool, index string, _type string, id string, query string) (api.Match, error)

Explain computes a score explanation for a query and a specific document. This can give useful feedback whether a document matches or didn’t match a specific query. This feature is available from version 0.19.9 and up. see http://www.elasticsearch.org/guide/reference/api/explain.html

func Get

func Get(pretty bool, index string, _type string, id string) (api.BaseResponse, error)

Get allows caller to get a typed JSON document from the index based on its id. GET - retrieves the doc HEAD - checks for existence of the doc http://www.elasticsearch.org/guide/reference/api/get.html TODO: make this implement an interface

func GetIndexUrl

func GetIndexUrl(index string, _type string, id string, parentId string, version int, op_type string,
	routing string, timestamp string, ttl int, percolate string, timeout string) (retval string, e error)

func Index

func Index(pretty bool, index string, _type string, id string, data interface{}) (api.BaseResponse, error)

Index adds or updates a typed JSON document in a specific index, making it searchable, creating an index if it did not exist. if id is omited, op_type 'create' will be pased and http method will default to "POST" id is optional parentId is optional version is optional op_type is optional routing is optional timestamp is optional ttl is optional percolate is optional timeout is optional http://www.elasticsearch.org/guide/reference/api/index_.html

func IndexBulk

func IndexBulk(index string, _type string, id string, date *time.Time, data interface{}) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch

This uses the one Global Bulk Indexor, you can also create your own non-global indexors and use the Index functions of that

http://www.elasticsearch.org/guide/reference/api/bulk.html

func IndexBulkTtl

func IndexBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch.

This uses the one Global Bulk Indexor, you can also create your own non-global indexors and use the IndexTtl functions of that

http://www.elasticsearch.org/guide/reference/api/bulk.html

func IndexWithParameters

func IndexWithParameters(pretty bool, index string, _type string, id string, parentId string, version int, op_type string,
	routing string, timestamp string, ttl int, percolate string, timeout string, data interface{}) (api.BaseResponse, error)

IndexWithParameters takes all the potential parameters available

func MoreLikeThis

func MoreLikeThis(pretty bool, index string, _type string, id string, query MoreLikeThisQuery) (api.BaseResponse, error)

MoreLikeThis allows the caller to get documents that are “like” a specified document. http://www.elasticsearch.org/guide/reference/api/more-like-this.html

func Percolate

func Percolate(pretty bool, index string, _type string, name string, doc string) (api.Match, error)

func RegisterPercolate

func RegisterPercolate(pretty bool, index string, name string, query api.Query) (api.BaseResponse, error)

RegisterPercolate allows the caller to register queries against an index, and then send percolate requests which include a doc, and getting back the queries that match on that doc out of the set of registered queries. Think of it as the reverse operation of indexing and then searching. Instead of sending docs, indexing them, and then running queries. One sends queries, registers them, and then sends docs and finds out which queries match that doc. see http://www.elasticsearch.org/guide/reference/api/percolate.html

func Update

func Update(pretty bool, index string, _type string, id string) (api.BaseResponse, error)

Update updates a document based on a script provided. The operation gets the document (collocated with the shard) from the index, runs the script (with optional script language and parameters), and index back the result (also allows to delete, or ignore the operation). It uses versioning to make sure no updates have happened during the “get” and “reindex”. (available from 0.19 onwards). Note, this operation still means full reindex of the document, it just removes some network roundtrips and reduces chances of version conflicts between the get and the index. The _source field need to be enabled for this feature to work.

http://www.elasticsearch.org/guide/reference/api/update.html TODO: finish this, it's fairly complex

func UpdateBulk

func UpdateBulk(index string, _type string, id string, date *time.Time, data interface{}) error

func UpdateBulkTtl

func UpdateBulkTtl(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

func Validate

func Validate(pretty bool, index string, _type string, query string, explain bool) (api.BaseResponse, error)

Validate allows a user to validate a potentially expensive query without executing it. see http://www.elasticsearch.org/guide/reference/api/validate.html

func WriteBulkBytes

func WriteBulkBytes(op string, index string, _type string, id, ttl string, date *time.Time, data interface{}) ([]byte, error)

Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index http://www.elasticsearch.org/guide/reference/api/bulk.html

Types

type BulkIndexor

type BulkIndexor struct {

	// We are creating a variable defining the func responsible for sending
	// to allow a mock sendor for test purposes
	BulkSendor func(*bytes.Buffer) error

	// If we encounter an error in sending, we are going to retry for this long
	// before returning an error
	// if 0 it will not retry
	RetryForSeconds int

	// channel for getting errors
	ErrorChannel chan *ErrorBuffer

	// Buffer for Max number of time before forcing flush
	BufferDelayMax time.Duration
	// Max buffer size in bytes before flushing to elasticsearch
	BulkMaxBuffer int // 1048576
	// Max number of Docs to hold in buffer before forcing flush
	BulkMaxDocs int // 100
	// contains filtered or unexported fields
}

A bulk indexor creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.

Example (Errorchannel)

The simplest usage of background bulk indexing with error channel

package main

import (
	"fmt"
	"github.com/Insightpool/elastigo/core"
	"strconv"
)

func main() {
	indexor := core.NewBulkIndexorErrors(10, 60)
	done := make(chan bool)
	indexor.Run(done)

	go func() {
		for errBuf := range indexor.ErrorChannel {
			// just blissfully print errors forever
			fmt.Println(errBuf.Err)
		}
	}()
	for i := 0; i < 20; i++ {
		indexor.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`)
	}
	done <- true
}
Output:

Example (Errorsmarter)

The simplest usage of background bulk indexing with error channel

package main

import (
	"fmt"
	"github.com/Insightpool/elastigo/core"
	"strconv"
	"time"
)

func main() {
	indexor := core.NewBulkIndexorErrors(10, 60)
	done := make(chan bool)
	indexor.Run(done)

	errorCt := 0 // use sync.atomic or something if you need
	timer := time.NewTicker(time.Minute * 3)
	go func() {
		for {
			select {
			case _ = <-timer.C:
				if errorCt < 2 {
					errorCt = 0
				}
			case _ = <-done:
				return
			}
		}
	}()

	go func() {
		for errBuf := range indexor.ErrorChannel {
			errorCt++
			fmt.Println(errBuf.Err)
			// log to disk?  db?   ????  Panic
		}
	}()
	for i := 0; i < 20; i++ {
		indexor.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`)
	}
	done <- true // send shutdown signal
}
Output:

Example (Responses)

The inspecting the response

package main

import (
	"bytes"
	"fmt"
	"github.com/Insightpool/elastigo/api"
	"github.com/Insightpool/elastigo/core"
	"strconv"
)

func main() {
	indexor := core.NewBulkIndexor(10)
	// Create a custom Sendor Func, to allow inspection of response/error
	indexor.BulkSendor = func(buf *bytes.Buffer) error {
		// @buf is the buffer of docs about to be written
		respJson, err := api.DoCommand("POST", "/_bulk", buf)
		if err != nil {
			// handle it better than this
			fmt.Println(string(respJson))
		}
		return err
	}
	done := make(chan bool)
	indexor.Run(done)

	for i := 0; i < 20; i++ {
		indexor.Index("twitter", "user", strconv.Itoa(i), "", nil, `{"name":"bob"}`)
	}
	done <- true // send shutdown signal
}
Output:

Example (Simple)

The simplest usage of background bulk indexing

package main

import (
	"github.com/Insightpool/elastigo/core"
)

func main() {
	indexor := core.NewBulkIndexorErrors(10, 60)
	done := make(chan bool)
	indexor.Run(done)

	indexor.Index("twitter", "user", "1", "", nil, `{"name":"bob"}`)

	<-done // wait forever
}
Output:

func NewBulkIndexor

func NewBulkIndexor(maxConns int) *BulkIndexor

func NewBulkIndexorErrors

func NewBulkIndexorErrors(maxConns, retrySeconds int) *BulkIndexor

A bulk indexor with more control over error handling

 @maxConns is the max number of in flight http requests
 @retrySeconds is # of seconds to wait before retrying falied requests

done := make(chan bool)
BulkIndexorGlobalRun(100, done)

func (*BulkIndexor) Flush

func (b *BulkIndexor) Flush()

Flush all current documents to ElasticSearch

func (*BulkIndexor) Index

func (b *BulkIndexor) Index(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. it operates by buffering requests, and ocassionally flushing to elasticsearch http://www.elasticsearch.org/guide/reference/api/bulk.html

func (*BulkIndexor) Run

func (b *BulkIndexor) Run(done chan bool)

Starts this bulk Indexor running, this Run opens a go routine so is Non blocking

func (*BulkIndexor) Update

func (b *BulkIndexor) Update(index string, _type string, id, ttl string, date *time.Time, data interface{}) error

type CountResponse

type CountResponse struct {
	Count int        `json:"count"`
	Shard api.Status `json:"_shards"`
}

func Count

func Count(pretty bool, index string, _type string) (CountResponse, error)

Count allows the caller to easily execute a query and get the number of matches for that query. It can be executed across one or more indices and across one or more types. The query can either be provided using a simple query string as a parameter, or using the Query DSL defined within the request body. http://www.elasticsearch.org/guide/reference/api/count.html TODO: take parameters. currently not working against 0.19.10

type DeleteByQueryResponse

type DeleteByQueryResponse struct {
	Status   bool                   `json:"ok"`
	Indicies map[string]IndexStatus `json:"_indices"`
}

type ErrorBuffer

type ErrorBuffer struct {
	Err error
	Buf *bytes.Buffer
}

type Explaination

type Explaination struct {
	Index string `json:"index"`
	Valid bool   `json:"valid"`
	Error string `json:"error"`
}

type Float32Nullable

type Float32Nullable float32

Elasticsearch returns some invalid (according to go) json, with floats having...

json: cannot unmarshal null into Go value of type float32 (see last field.)

"hits":{"total":6808,"max_score":null,

"hits":[{"_index":"10user","_type":"user","_id":"751820","_score":null,

func (*Float32Nullable) UnmarshalJSON

func (i *Float32Nullable) UnmarshalJSON(data []byte) error

type Hit

type Hit struct {
	Index  string          `json:"_index"`
	Type   string          `json:"_type,omitempty"`
	Id     string          `json:"_id"`
	Score  Float32Nullable `json:"_score,omitempty"` // Filters (no query) dont have score, so is null
	Source json.RawMessage `json:"_source"`          // marshalling left to consumer
	Fields json.RawMessage `json:"fields"`           // when a field arg is passed to ES, instead of _source it returns fields
}

type Hits

type Hits struct {
	Total int `json:"total"`
	//	MaxScore float32 `json:"max_score"`
	Hits []Hit `json:"hits"`
}

func (*Hits) Len

func (h *Hits) Len() int

type IndexStatus

type IndexStatus struct {
	Shards api.Status `json:"_shards"`
}

type MGetRequest

type MGetRequest struct {
	Index  string   `json:"_index"`
	Type   string   `json:"_type"`
	ID     string   `json:"_id"`
	IDS    []string `json:"_ids,omitempty"`
	Fields []string `json:"fields,omitempty"`
}

type MGetRequestContainer

type MGetRequestContainer struct {
	Docs []MGetRequest `json:"docs"`
}

type MGetResponseContainer

type MGetResponseContainer struct {
	Docs []api.BaseResponse `json:"docs"`
}

func MGet

func MGet(pretty bool, index string, _type string, req string) (MGetResponseContainer, error)

MGet allows the caller to get multiple documents based on an index, type (optional) and id (and possibly routing). The response includes a docs array with all the fetched documents, each element similar in structure to a document provided by the get API. see http://www.elasticsearch.org/guide/reference/api/multi-get.html

type MLT

type MLT struct {
	Fields              []string `json:"fields"`
	LikeText            string   `json:"like_text"`
	PercentTermsToMatch float32  `json:"percent_terms_to_match"`
	MinTermFrequency    int      `json:"min_term_freq"`
	MaxQueryTerms       int      `json:"max_query_terms"`
	StopWords           []string `json:"stop_words"`
	MinDocFrequency     int      `json:"min_doc_freq"`
	MaxDocFrequency     int      `json:"max_doc_freq"`
	MinWordLength       int      `json:"min_word_len"`
	MaxWordLength       int      `json:"max_word_len"`
	BoostTerms          int      `json:"boost_terms"`
	Boost               float32  `json:"boost"`
	Analyzer            string   `json:"analyzer"`
}

type MoreLikeThisQuery

type MoreLikeThisQuery struct {
	MoreLikeThis MLT `json:"more_like_this"`
}

type SearchResult

type SearchResult struct {
	Took        int             `json:"took"`
	TimedOut    bool            `json:"timed_out"`
	ShardStatus api.Status      `json:"_shards"`
	Hits        Hits            `json:"hits"`
	Facets      json.RawMessage `json:"facets,omitempty"` // structure varies on query
	ScrollId    string          `json:"_scroll_id,omitempty"`
}

func Scroll

func Scroll(pretty bool, scroll_id string, scroll string) (SearchResult, error)

func SearchRequest

func SearchRequest(pretty bool, index string, _type string, query interface{}, scroll string, scan int) (SearchResult, error)

SearchRequest performs a very basic search on an index via the request URI API.

params:

@pretty:  bool for pretty reply or not, a parameter to elasticsearch
@index:  the elasticsearch index
@_type:  optional ("" if not used) search specific type in this index
@query:  this can be one of 3 types:
           1)  string value that is valid elasticsearch
           2)  io.Reader that can be set in body (also valid elasticsearch string syntax..)
           3)  other type marshalable to json (also valid elasticsearch json)

out, err := SearchRequest(true, "github","",qryType ,"", 0)

http://www.elasticsearch.org/guide/reference/api/search/uri-request.html

func SearchUri

func SearchUri(index, _type string, query, scroll string, scan int) (SearchResult, error)

SearchUri performs the simplest possible query in url string params:

@index:  the elasticsearch index
@_type:  optional ("" if not used) search specific type in this index
@query:  valid string lucene search syntax

out, err := SearchUri("github","",`user:kimchy` ,"", 0)

produces a request like this: host:9200/github/_search?q=user:kimchy"

http://www.elasticsearch.org/guide/reference/api/search/uri-request.html

func (*SearchResult) String

func (s *SearchResult) String() string

type Validation

type Validation struct {
	Valid         bool           `json:"valid"`
	Shards        api.Status     `json:"_shards"`
	Explainations []Explaination `json:"explanations,omitempty"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL