Published: Jul 2, 2019 License: Apache-2.0



package batchInserter provides a wrapper around the db.Inserter to provide a way to group records together before inserting, in order to decrease database requests needed for inserting.



const (
	InsertingQueueDepth            = "inserting_queue_depth"
	DroppedEventsFromDbFailCounter = "dropped_events_db_fail_count"


func Metrics

func Metrics() []xmetrics.Metric


type BatchInserter

type BatchInserter struct {
	// contains filtered or unexported fields

BatchInserter manages batching events that need to be inserted, ensuring that an event that needs to be inserted isn't waiting for longer than a set period of time and that each batch doesn't pass a specified size.

func NewBatchInserter

func NewBatchInserter(config Config, logger log.Logger, metricsRegistry provider.Provider, inserter db.Inserter) (*BatchInserter, error)

NewBatchInserter creates a BatchInserter with the given values, ensuring that the configuration and other values given are valid. If configuration values aren't valid, a default value is used.

func (*BatchInserter) Insert

func (b *BatchInserter) Insert(record db.Record)

Insert adds the event to the queue inside of BatchInserter, preparing for it to be inserted. This can block, if the queue is full.

func (*BatchInserter) Start

func (b *BatchInserter) Start()

Start starts the batcher, which pulls from the queue inside of the BatchInserter.

func (*BatchInserter) Stop

func (b *BatchInserter) Stop()

Stop closes the internal queue and waits for the workers to finish processing what has already been added. This can block as it waits for everything to stop. After Stop() is called, Insert() should not be called again, or there will be a panic. TODO: ensure consumers can't cause a panic?

type Config

type Config struct {
	MaxWorkers       int
	MaxBatchSize     int
	MaxBatchWaitTime time.Duration
	QueueSize        int

Config holds the configuration values for a batch inserter.

type Measures

type Measures struct {
	InsertingQueue               metrics.Gauge
	DroppedEventsFromDbFailCount metrics.Counter

func NewMeasures

func NewMeasures(p provider.Provider) *Measures

NewMeasures constructs a Measures given a go-kit metrics Provider

