batchinsert

package module
v0.0.0-...-4b2ed9f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2019 License: MIT Imports: 8 Imported by: 0

README

go-clickhouse-batchinsert

go client (batch insert) base on https://github.com/kshvakov/clickhouse/

Clickhouse client for batch insert

Ref to https://clickhouse.yandex/docs/en/introduction/performance/

We recommend inserting data in packets of at least 1000 rows, or no more than a single request per second.

We better insert data in bulk, clickhouse-client can do bulk insert like this

    var (
      tx, _   = connect.Begin()
      stmt, _ = tx.Prepare("...")
    )
    defer stmt.Close()

    for _, obj := objs {
      if _, err := stmt.Exec(...); err != nil {
        log.Fatal(err)
      }
    }

    if err := tx.Commit(); err != nil {
      log.Fatal(err)
    }

So we only need to cache objs, and flush them at a fixed time or a fixed amount.

Example
        
import batchinsert "github.com/MaruHyl/go-clickhouse-batchinsert"

func ExampleBatchInsert() {
	//
	createTable := `
			CREATE TABLE IF NOT EXISTS test (
				x        UInt8
			) engine=Memory`
	insertSql := "INSERT INTO test (x) values (?)"
	//
	b, err := batchinsert.New(
		insertSql,
		batchinsert.WithHost("127.0.0.1:9000"),
	)
	if err != nil {
		panic(err)
	}
	//
	_, err = b.DB().Exec(createTable)
	if err != nil {
		panic(err)
	}
	//
	b.Insert(1)
	b.Insert(2)
	b.Insert(3)
	//
	_, err = b.DB().Exec(`DROP TABLE IF EXISTS test`)
	if err != nil {
		panic(err)
	}
	err = b.Close()
	if err != nil {
		panic(err)
	}

	// Output:
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("batch insert is closed")

Functions

func GetUrl

func GetUrl(opts options) string

Types

type BatchInsert

type BatchInsert struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	batchinsert "github.com/MaruHyl/go-clickhouse-batchinsert"
)

func main() {
	//
	createTable := `
			CREATE TABLE IF NOT EXISTS test (
				x        UInt8
			) engine=Memory`
	insertSql := "INSERT INTO test (x) values (?)"
	//
	b, err := batchinsert.New(
		insertSql,
		batchinsert.WithHost("127.0.0.1:9000"),
	)
	if err != nil {
		panic(err)
	}
	//
	_, err = b.DB().Exec(createTable)
	if err != nil {
		panic(err)
	}
	//
	_ = b.Insert(1)
	_ = b.Insert(2)
	_ = b.Insert(3)
	//
	_, err = b.DB().Exec(`DROP TABLE IF EXISTS test`)
	if err != nil {
		panic(err)
	}
	err = b.Close()
	if err != nil {
		panic(err)
	}

}
Output:

func New

func New(insertSql string, opts ...Option) (*BatchInsert, error)

func (*BatchInsert) Close

func (b *BatchInsert) Close() error

func (*BatchInsert) DB

func (b *BatchInsert) DB() *sql.DB

func (*BatchInsert) Insert

func (b *BatchInsert) Insert(values ...interface{}) (err error)

func (*BatchInsert) Len

func (b *BatchInsert) Len() int

type ConnectionOpenStrategy

type ConnectionOpenStrategy string
const OpenStrategyInOrder ConnectionOpenStrategy = "in_order"
const OpenStrategyRandom ConnectionOpenStrategy = "random"

type Logger

type Logger interface {
	Log(keyAndValues ...interface{})
}

type Option

type Option func(opt *options)

func WithAltHosts

func WithAltHosts(altHosts string) Option

Comma separated list of single address host for load-balancing

func WithBlockSize

func WithBlockSize(blockSize int) Option

Maximum rows in block (default is 1000000). If the rows are larger then the data will be split into several blocks to send them to the server

func WithConnectionOpenStrategy

func WithConnectionOpenStrategy(connectionOpenStrategy ConnectionOpenStrategy) Option

Random/in_order (default random)

func WithDatabase

func WithDatabase(db string) Option

Select the current default database

func WithDebug

func WithDebug(debug bool) Option

Enable debug output

func WithFlushPeriod

func WithFlushPeriod(flushPeriod time.Duration) Option

func WithHost

func WithHost(host string) Option

func WithLogger

func WithLogger(logger Logger) Option

func WithMaxBatchSize

func WithMaxBatchSize(maxBatchSize int) Option

func WithNoDelay

func WithNoDelay(noDelay bool) Option

Disable/enable the Nagle Algorithm for tcp socket (default is 'true' - disable)

func WithPoolSize

func WithPoolSize(poolSize int) Option

Maximum amount of pre-allocated byte chunks used in queries (default is 100). Decrease this if you experience memory problems at the expense of more GC pressure and vice versa.

func WithReadTimeout

func WithReadTimeout(readTimeout int) Option

Timeout in second

func WithUserInfo

func WithUserInfo(userName, password string) Option

Auth credentials

func WithWriteTimeOut

func WithWriteTimeOut(writeTimeOut int) Option

Timeout in second

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL