docappender

package module
v2.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2025 License: Apache-2.0 Imports: 27 Imported by: 2

README

ci

go-docappender

go-docappender provides a Go API for append-only Elasticsearch document indexing.

License

This software is licensed under the Apache 2 license.

Design

go-docappender is an evolution of the Elastic APM Server Elasticsearch output, and was formerly known as modelindexer.

Prior to 8.0, APM Server used the libbeat Elasticsearch output. 8.0 introduced a new output called "modelindexer", which was coupled to the APM Server event data model and optimised for APM Server's usage. From 8.0 until 8.5, modelindexer processed events synchronously and used mutexes for synchronized writes to the cache. This worked well, but didn't seem to scale well on bigger instances with more CPUs.

flowchart LR;
    subgraph Goroutine
        Flush;
    end
    AgentA & AgentB-->Handler;
    subgraph Intake
    Handler<-->|semaphore|Decode
    Decode-->Batch;
    end
    subgraph ModelIndexer
    Available-.->Active;
    Batch-->Active;
    Active<-->|mutex|Cache;
    end
    Cache-->|FullOrTimer|Flush;

    Flush-->|bulk|ES[(Elasticsearch)];
    Flush-->|done|Available;

In APM Server 8.6.0, modelindexer was redesigned to accept events asynchronously, and run one or more "active indexers", which would each pull events from an in-memory queue and (by default) compress them and write them to a buffer. This approach reduced lock contention, and allowed for automatically scaling the number of active indexers up and down based on queue utilisation, with an upper bound based on the available memory.

flowchart LR;
    subgraph Goroutine11
        Flush1(Flush);
    end
    subgraph Goroutine22
        Flush2(Flush);
    end
    AgentA & AgentB-->Handler;
    subgraph Intake
    Handler<-->|semaphore|Decode
    Decode-->Batch;
    end
    subgraph ModelIndexer
    Batch-->Buffer;
    Available;
        subgraph Goroutine1
            Active1(Active);
            Active1(Active)<-->Cache1(Cache);
            Cache1(Cache)-->|FullOrTimer|Flush1(Flush);
        end
        subgraph Goroutine2
            Active2(Active);
            Active2(Active)<-->Cache2(Cache);
            Cache2(Cache)-->|FullOrTimer|Flush2(Flush);
        end
        subgraph Channel
            Buffer-->Active1(Active) & Active2(Active);
        end
        Available-.->Active1(Active) & Active2(Active);
    end

    Flush1(Flush) & Flush2(Flush)-->|bulk|ES[(Elasticsearch)];
    Flush1(Flush) & Flush2(Flush)-->|done|Available;

Releasing

We use GitHub releases to manage tagged releases, and aim to conform to semver in our release naming.

To create a new release, use the new release interface, and use GitHub's generate release notes to get an automatically-generated list of changes made since the last release.

Documentation

Overview

Package docappender provides an API for append-only bulk document indexing into Elasticsearch.

This package provides an intentionally simpler and more restrictive API than the go-elasticsearch/esutil.BulkIndexer API; it is not intended to cover all bulk API use cases. It is intended to be used for conflict-free, append-only indexing into Elasticsearch data streams.

Index

Constants

View Source
const (
	// Actions are all the actions that can be used when indexing data.
	// `create` will be used by default.
	ActionCreate = "create"
	ActionDelete = "delete"
	ActionIndex  = "index"
	ActionUpdate = "update"

	HeaderEventCount         = "X-Elastic-Event-Count"
	HeaderUncompressedLength = "X-Elastic-Uncompressed-Request-Length"
)

Variables

View Source
var (
	// ErrClosed is returned from methods of closed Indexers.
	ErrClosed = errors.New("model indexer closed")
)

Functions

This section is empty.

Types

type Appender

type Appender struct {
	// contains filtered or unexported fields
}

Appender provides an append-only API for bulk indexing documents into Elasticsearch.

Appender buffers documents in their JSON encoding until either the accumulated buffer reaches `config.FlushBytes`, or `config.FlushInterval` elapses.

Appender fills a single bulk request buffer at a time to ensure bulk requests are optimally sized, avoiding sparse bulk requests as much as possible. After a bulk request is flushed, the next document added will wait for the next available bulk request buffer and repeat the process.

Up to `config.MaxRequests` bulk requests may be flushing/active concurrently, to allow the server to make progress encoding while Elasticsearch is busy servicing flushed bulk requests.

func New

func New(client elastictransport.Interface, cfg Config) (*Appender, error)

New returns a new Appender that indexes documents into Elasticsearch. It is only tested with v8 go-elasticsearch client. Use other clients at your own risk.

func (*Appender) Add

func (a *Appender) Add(ctx context.Context, index string, document io.WriterTo) error

Add enqueues document for appending to index.

The document body will be copied to a buffer using io.Copy, and document may implement io.WriterTo to reduce overhead of copying.

The document io.WriterTo will be accessed after Add returns, and must remain accessible until its Read method returns EOF, or its WriterTo method returns.

func (*Appender) Close

func (a *Appender) Close(ctx context.Context) error

Close closes the indexer, first flushing any queued items.

Close returns an error if any flush attempts during the indexer's lifetime returned an error. If ctx is cancelled, Close returns and any ongoing flush attempts are cancelled.

func (*Appender) IndexersActive added in v2.11.0

func (a *Appender) IndexersActive() int64

type BulkIndexer

type BulkIndexer struct {
	// contains filtered or unexported fields
}

BulkIndexer issues bulk requests to Elasticsearch. It is NOT safe for concurrent use by multiple goroutines.

func NewBulkIndexer

func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error)

NewBulkIndexer returns a bulk indexer that issues bulk requests to Elasticsearch. It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. The returned BulkIndexer is NOT safe for concurrent use by multiple goroutines.

func (*BulkIndexer) Add

func (b *BulkIndexer) Add(item BulkIndexerItem) error

Add encodes an item in the buffer.

func (*BulkIndexer) BytesFlushed

func (b *BulkIndexer) BytesFlushed() int

BytesFlushed returns the number of bytes flushed by the bulk indexer.

func (*BulkIndexer) BytesUncompressedFlushed added in v2.1.0

func (b *BulkIndexer) BytesUncompressedFlushed() int

BytesUncompressedFlushed returns the number of uncompressed bytes flushed by the bulk indexer.

func (*BulkIndexer) Flush

Flush executes a bulk request if there are any items buffered, and clears out the buffer.

func (*BulkIndexer) Items

func (b *BulkIndexer) Items() int

Items returns the number of buffered items.

func (*BulkIndexer) Len

func (b *BulkIndexer) Len() int

Len returns the number of buffered bytes.

func (*BulkIndexer) Reset

func (b *BulkIndexer) Reset()

Reset resets bulk indexer, ready for a new request.

func (*BulkIndexer) SetClient added in v2.8.0

func (b *BulkIndexer) SetClient(client elastictransport.Interface)

SetClient resets the client used by the bulk indexer.

func (*BulkIndexer) UncompressedLen added in v2.1.0

func (b *BulkIndexer) UncompressedLen() int

UncompressedLen returns the number of uncompressed buffered bytes.

type BulkIndexerConfig

type BulkIndexerConfig struct {
	// Client holds the Elasticsearch client.
	Client elastictransport.Interface

	// MaxDocumentRetries holds the maximum number of document retries
	MaxDocumentRetries int

	// RetryOnDocumentStatus holds the document level statuses that will trigger a document retry.
	//
	// If RetryOnDocumentStatus is empty or nil, the default of [429] will be used.
	RetryOnDocumentStatus []int

	// CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression)
	// to 9 (gzip.BestCompression). Higher values provide greater compression, at a
	// greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the
	// default compression level.
	CompressionLevel int

	// Pipeline holds the ingest pipeline ID.
	//
	// If Pipeline is empty, no ingest pipeline will be specified in the Bulk request.
	Pipeline string

	// RequireDataStream, If set to true, an index will be created only if a
	// matching index template is found and it contains a data stream template.
	// When true, `require_data_stream=true` is set in the bulk request.
	// When false or not set, `require_data_stream` is not set in the bulk request.
	// Which could cause a classic index to be created if no data stream template
	// matches the index in the request.
	//
	// RequireDataStream is disabled by default.
	RequireDataStream bool

	// IncludeSourceOnError, if set to True, the response body of a Bulk Index request
	// might contain the part of source document on error.
	// If Unset the error reason will be dropped.
	// Requires Elasticsearch 8.18+ if value is True or False.
	// WARNING: if set to True, user is responsible for sanitizing the error as it may contain
	// sensitive data.
	//
	// IncludeSourceOnError is Unset by default
	IncludeSourceOnError Value

	// PopulateFailedDocsInput controls whether each BulkIndexerResponseItem.Input
	// in BulkIndexerResponseStat.FailedDocs is populated with the input of the item,
	// which includes the action line and the document line.
	//
	// WARNING: this is provided for testing and debugging only.
	// Use with caution as it may expose sensitive data; any clients
	// of go-docappender enabling this should relay this warning to
	// their users. Setting this will also add memory overhead.
	PopulateFailedDocsInput bool
}

BulkIndexerConfig holds configuration for BulkIndexer.

func BulkIndexerConfigFrom added in v2.8.0

func BulkIndexerConfigFrom(cl elastictransport.Interface, cfg Config) BulkIndexerConfig

BulkIndexerConfigFrom creates a BulkIndexerConfig from the provided Config, with additional information included as necessary.

func (BulkIndexerConfig) Validate added in v2.8.0

func (cfg BulkIndexerConfig) Validate() error

Validate checks the configuration for errors.

type BulkIndexerItem

type BulkIndexerItem struct {
	Index             string
	DocumentID        string
	Pipeline          string
	Action            string
	Body              io.WriterTo
	DynamicTemplates  map[string]string
	RequireDataStream bool
}

type BulkIndexerPool added in v2.8.0

type BulkIndexerPool struct {
	// contains filtered or unexported fields
}

BulkIndexerPool is a pool of BulkIndexer instances. It is designed to be used in a concurrent environment where multiple goroutines may need to acquire and release indexers.

The pool allows a minimum number of BulkIndexers to be guaranteed per ID, a maximum number of indexers per ID and an overall lease limit. This is useful to ensure the pool does not grow too large, even if some IDs are slow to release indexers.

func NewBulkIndexerPool added in v2.8.0

func NewBulkIndexerPool(guaranteed, max, total int, c BulkIndexerConfig) *BulkIndexerPool

NewBulkIndexerPool returns a new BulkIndexerPool with: - The specified guaranteed indexers per ID - The maximum number of concurrent BulkIndexers per ID - A total (max) number of indexers to be leased per pool. - The BulkIndexerConfig to use when creating new indexers.

func (*BulkIndexerPool) Deregister added in v2.8.0

func (p *BulkIndexerPool) Deregister(id string) <-chan *BulkIndexer

Deregister removes the id from the pool and returns a closed BulkIndexer channel with all the non-empty indexers associated with the ID.

func (*BulkIndexerPool) Get added in v2.8.0

func (p *BulkIndexerPool) Get(ctx context.Context, id string) (*BulkIndexer, error)

Get returns a BulkIndexer for the specified ID as the ID is registered and below the guaranteed minimum OR the local and overall limits.

If the overall limit of indexers has been reached, it will wait until a slot is available, blocking execution.

If Deregister is called while waiting, an error and nil indexer is returned.

func (*BulkIndexerPool) Put added in v2.8.0

func (p *BulkIndexerPool) Put(id string, indexer *BulkIndexer)

Put returns the BulkIndexer to the pool. If the indexer is non-empty, it is stored in the non-empty channel for the ID. Otherwise, it is returned to the general pool. After calling Put() no references to the indexer should be stored, since doing so may lead to undefined behavior and unintended memory sharing.

func (*BulkIndexerPool) Register added in v2.8.0

func (p *BulkIndexerPool) Register(id string)

Register adds an ID to the pool. If the ID already exists, it does nothing. This is useful for ensuring that the ID is registered before any indexers are acquired for it.

type BulkIndexerResponseItem

type BulkIndexerResponseItem struct {
	Index  string `json:"_index"`
	Status int    `json:"status"`

	Position int

	Error struct {
		Type   string `json:"type"`
		Reason string `json:"reason"`
	} `json:"error,omitempty"`

	Input string `json:"-"`
}

BulkIndexerResponseItem represents the Elasticsearch response item.

type BulkIndexerResponseStat

type BulkIndexerResponseStat struct {
	// Indexed contains the total number of successfully indexed documents.
	Indexed int64
	// RetriedDocs contains the total number of retried documents.
	RetriedDocs int64
	// FailureStoreDocs contains failure store specific document stats.
	FailureStoreDocs struct {
		// Used contains the total number of documents indexed to failure store.
		Used int64
		// Failed contains the total number of documents which failed when indexed to failure store.
		Failed int64
		// NotEnabled contains the total number of documents which could have been indexed to failure store
		// if it was enabled.
		NotEnabled int64
	}
	// GreatestRetry contains the greatest observed retry count in the entire
	// bulk request.
	GreatestRetry int
	// FailedDocs contains the failed documents.
	FailedDocs []BulkIndexerResponseItem
}

type Config

type Config struct {
	// Logger holds an optional Logger to use for logging indexing requests.
	//
	// All Elasticsearch errors will be logged at error level, so in cases
	// where the indexer is used for high throughput indexing, is recommended
	// that a rate-limited logger is used.
	//
	// If Logger is nil, logging will be disabled.
	Logger *zap.Logger

	// TracerProvider holds an optional otel TracerProvider for tracing
	// flush requests.
	//
	// If TracerProvider is nil, requests will not be traced.
	// To use this provider Tracer must be nil.
	TracerProvider trace.TracerProvider

	// CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression)
	// to 9 (gzip.BestCompression). Higher values provide greater compression, at a
	// greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the
	// default compression level.
	CompressionLevel int

	// MaxRequests holds the maximum number of bulk index requests to execute concurrently.
	// The maximum memory usage of Appender is thus approximately MaxRequests*FlushBytes.
	//
	// If MaxRequests is less than or equal to zero, the default of 10 will be used.
	MaxRequests int

	// BulkIndexerPool holds an optional pool that is used for creating new BulkIndexers.
	// If not set/nil, a new BulkIndexerPool will be created with MaxRequests as the
	// guaranteed, local and total maximum number of indexers.
	//
	// A BulkIndexerPool may be shared between multiple Appender instances. Each has its
	// own unique ID to guarantee per Appender limits.
	//
	// For more information, see [NewBulkIndexerPool].
	BulkIndexerPool *BulkIndexerPool

	// MaxDocumentRetries holds the maximum number of document retries
	MaxDocumentRetries int

	// RetryOnDocumentStatus holds the document level statuses that will trigger a document retry.
	//
	// If RetryOnDocumentStatus is empty or nil, the default of [429] will be used.
	RetryOnDocumentStatus []int

	// FlushBytes holds the flush threshold in bytes. If Compression is enabled,
	// The number of documents that can be buffered will be greater.
	//
	// If FlushBytes is zero, the default of 1MB will be used.
	FlushBytes int

	// FlushInterval holds the flush threshold as a duration.
	//
	// If FlushInterval is zero, the default of 30 seconds will be used.
	FlushInterval time.Duration

	// FlushTimeout holds the flush timeout as a duration.
	//
	// If FlushTimeout is zero, no timeout will be used.
	FlushTimeout time.Duration

	// DocumentBufferSize sets the number of documents that can be buffered before
	// they are stored in the active indexer buffer.
	//
	// If DocumentBufferSize is zero, the default 1024 will be used.
	DocumentBufferSize int

	// Pipeline holds the ingest pipeline ID.
	//
	// If Pipeline is empty, no ingest pipeline will be specified in the Bulk request.
	Pipeline string

	// RequireDataStream, If set to true, an index will be created only if a
	// matching index template is found and it contains a data stream template.
	// When true, `require_data_stream=true` is set in the bulk request.
	// When false or not set, `require_data_stream` is not set in the bulk request.
	// Which could cause a classic index to be created if no data stream template
	// matches the index in the request.
	//
	// RequireDataStream is disabled by default.
	RequireDataStream bool

	// IncludeSourceOnError, if set to True, the response body of a Bulk Index request
	// might contain the part of source document on error.
	// If Unset the error reason will be dropped.
	// Requires Elasticsearch 8.18+ if value is True or False.
	// WARNING: if set to True, user is responsible for sanitizing the error as it may contain
	// sensitive data.
	//
	// IncludeSourceOnError is Unset by default
	IncludeSourceOnError Value

	// PopulateFailedDocsInput controls whether each BulkIndexerResponseItem.Input
	// in BulkIndexerResponseStat.FailedDocs is populated with the input of the item,
	// which includes the action line and the document line.
	//
	// WARNING: this is provided for testing and debugging only.
	// Use with caution as it may expose sensitive data; any clients
	// of go-docappender enabling this should relay this warning to
	// their users. Setting this will also add memory overhead.
	PopulateFailedDocsInput bool

	// Scaling configuration for the docappender.
	//
	// If unspecified, scaling is enabled by default.
	Scaling ScalingConfig

	// MeterProvider holds the OTel MeterProvider to be used to create and
	// record appender metrics.
	//
	// If unset, the global OTel MeterProvider will be used, if that is unset,
	// no metrics will be recorded.
	MeterProvider metric.MeterProvider

	// MetricAttributes holds any extra attributes to set in the recorded
	// metrics.
	MetricAttributes attribute.Set
}

Config holds configuration for Appender.

func DefaultConfig added in v2.8.0

func DefaultConfig(cl elastictransport.Interface, cfg Config) Config

DefaultConfig returns a copy of cfg with any zero values set to their default values.

type ErrorFlushFailed added in v2.7.0

type ErrorFlushFailed struct {
	// contains filtered or unexported fields
}

func (ErrorFlushFailed) Error added in v2.7.0

func (e ErrorFlushFailed) Error() string

func (ErrorFlushFailed) ResponseBody added in v2.7.0

func (e ErrorFlushFailed) ResponseBody() string

func (ErrorFlushFailed) StatusCode added in v2.7.0

func (e ErrorFlushFailed) StatusCode() int

type FailureStoreStatus added in v2.6.1

type FailureStoreStatus string

FailureStoreStatus defines enumeration type for all known failure store statuses.

const (
	// FailureStoreStatusUnknown implicit status which represents that there is no information about
	// this response or that the failure store is not applicable.
	FailureStoreStatusUnknown FailureStoreStatus = "not_applicable_or_unknown"
	// FailureStoreStatusUsed status which represents that this document was stored in the failure store successfully.
	FailureStoreStatusUsed FailureStoreStatus = "used"
	// FailureStoreStatusFailed status which represents that this document was rejected from the failure store.
	FailureStoreStatusFailed FailureStoreStatus = "failed"
	// FailureStoreStatusNotEnabled status which represents that this document was rejected, but
	// it could have ended up in the failure store if it was enabled.
	FailureStoreStatusNotEnabled FailureStoreStatus = "not_enabled"
)

type ScaleActionConfig

type ScaleActionConfig struct {
	// Threshold is the number of consecutive times a scale up/down condition
	// has to happen for the scaling action will be triggered.
	Threshold uint

	// CoolDown is the amount of time needed to elapse between scaling actions
	// to trigger it.
	CoolDown time.Duration
}

ScaleActionConfig holds the configuration for a scaling action

type ScalingConfig

type ScalingConfig struct {
	// Disabled toggles active indexer scaling on.
	//
	// It is enabled by default.
	Disabled bool

	// ActiveRatio defines the threshold for (potential) active indexers to
	// GOMAXPROCS. The higher the number, the more potential active indexers
	// there will be actively pulling from the BulkIndexerItem channel.
	// For example, when ActiveRatio:1 and GOMAXPROCS:2, there can be a max
	// of 2 active indexers, or 1 per GOMAXPROCS.
	// If set to 0.5, the maximum number of active indexers is 1, since.
	// The value must be between 0 and 1.
	//
	// It defaults to 0.25 by default.
	ActiveRatio float64

	// ScaleDown configures the Threshold and CoolDown for the scale down
	// action. In order to scale down an active indexer, the Threshold has
	// to be met after the CoolDown has elapsed. Scale down will only take
	// place if there are more than 1 active indexer.
	// Active indexers will be destroyed when they aren't needed anymore,
	// when enough timed flushes (FlushInterval) are performed by an active
	// indexer (controlled by Threshold), or when an active indexer is idle
	// for (IdleInterval * Threshold) as long as CoolDown allows it.
	//
	// When unset, the default of 30 is used for Threshold, and 30 seconds for
	// CoolDown.
	ScaleDown ScaleActionConfig

	// ScaleUp configures the Threshold and CoolDown for the scale up action.
	//
	// In order for a scale up to occur, the Threshold has to be met after
	// the CoolDown has elapsed. By default, a single active indexer is created
	// which actively pulls items from the internal buffered queue. When enough
	// full flushes (FlushBytes) are performed by an active indexer (controlled
	// by Threshold), a new active indexer will be created until GOMAXPROCS / 4
	// is reached (25% of CPU capacity) if the CoolDown allows it.
	//
	// When unspecified, the default of 60 is used for Threshold, and 60 seconds
	// for CoolDown.
	ScaleUp ScaleActionConfig

	// IdleInterval defines how long an active indexer performs an inactivity
	// check. The ScaleDown.Threshold and ScaleDown.CoolDown needs to be met
	// for an active indexer to be destroyed.
	//
	// When unspecified, the default of 30 seconds will be used.
	IdleInterval time.Duration
}

ScalingConfig holds the docappender autoscaling configuration.

type Value added in v2.7.0

type Value int
const (
	Unset Value = iota
	True
	False
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL