clickhousebuffer

package module
v3.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2022 License: Apache-2.0 Imports: 6 Imported by: 0

README

build build

Clickhouse Buffer

An easy-to-use, powerful and productive package for writing data to Clickhouse columnar database

Install

  • for go-clickhouse v1 $ go get -u github.com/zikwall/clickhouse-buffer
  • for go-clickhouse v2 $ go get -u github.com/zikwall/clickhouse-buffer/v3

Why and why

In the practice of using the Clickhouse database (in real projects), you often have to resort to creating your own bicycles in the form of queues and testers that accumulate the necessary amount of data or for a certain period of time and send one large data package to the Clickhouse database.

This is due to the fact that Clickhouse is designed so that it better processes new data in batches (and this is recommended by the authors themselves).

Features

  • non-blocking - (recommend) async write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 5000, or the flush interval, default 1s, times out. Asynchronous write client is recommended for frequent periodic writes.
  • blocking.

Client buffer engines:

  • in-memory - use native channels and slices
  • redis - use redis server as queue and buffer
  • in-memory-sync - if you get direct access to buffer, it will help to avoid data race
  • retries - resending "broken" or for some reason not sent packets

Usage

import (
    "database/sql"

    "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative"
    "github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql"
)

// if you already have a connection to Clickhouse you can just use wrappers
// with native interface
ch := cxnative.NewClickhouseWithConn(driver.Conn)
// or use database/sql interface
ch := cxsql.NewClickhouseWithConn(*sql.DB)
// if you don't want to create connections yourself, 
// package can do it for you, just call the connection option you need:

// with native interface
ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{
        Addr: ctx.StringSlice("clickhouse-host"),
        Auth: clickhouse.Auth{
            Database:  ctx.String("clickhouse-database"),
            Username:  ctx.String("clickhouse-username"),
            Password:  ctx.String("clickhouse-password"),
        },
        Settings: clickhouse.Settings{
            "max_execution_time": 60,
        },
        DialTimeout: 5 * time.Second,
        Compression: &clickhouse.Compression{
            Method: clickhouse.CompressionLZ4,
        },
        Debug: ctx.Bool("debug"),
})
// or with database/sql interface
ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{
        Addr: ctx.StringSlice("clickhouse-host"),
        Auth: clickhouse.Auth{
            Database:  ctx.String("clickhouse-database"),
            Username:  ctx.String("clickhouse-username"),
            Password:  ctx.String("clickhouse-password"),
        },
        Settings: clickhouse.Settings{
            "max_execution_time": 60,
        },
        DialTimeout: 5 * time.Second,
        Compression: &clickhouse.Compression{
            Method: clickhouse.CompressionLZ4,
        },
        Debug: ctx.Bool("debug"),
}, &cxsql.RuntimeOptions{})
Create main data streamer client and write data
import (
    clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3"
    "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem"
    "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative"
)
// create root client
client := clickhousebuffer.NewClientWithOptions(ctx, ch,
    clickhousebuffer.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000),
)
// create buffer engine
buffer := cxmem.NewBuffer(
    client.Options().BatchSize(),
)
// or use redis
buffer := cxredis.NewBuffer(
    contetx, *redis.Client, "bucket", client.Options().BatchSize(),
)
// create new writer api: table name with columns
writeAPI := client.Writer(
	cx.NewView("clickhouse_database.clickhouse_table", []string{"id", "uuid", "insert_ts"}), 
	buffer,
)

// define your custom data structure
type MyCustomDataView struct {
	id       int
	uuid     string
	insertTS time.Time
}
// and implement cx.Vectorable interface
func (t *MyCustomDataView) Row() cx.Vector {
	return cx.Vector{t.id, t.uuid, t.insertTS.Format(time.RFC822)}
}
// async write your data
writeAPI.WriteRow(&MyCustomDataView{
    id: 1, uuid: "1", insertTS: time.Now(),
})
// or use a safe way (same as WriteRow, but safer)
writeAPI.TryWriteRow(&MyCustomDataView{
    id: 1, uuid: "1", insertTS: time.Now(),
})
// or faster
writeAPI.WriteVector(cx.Vector{
    1, "1", time.Now(),
})
// safe way
writeAPI.TryWriteVector(cx.Vector{
    1, "1", time.Now(),
})

When using a non-blocking record, you can track errors through a special error channel

errorsCh := writeAPI.Errors()
go func() {
	for err := range errorsCh {
		log.Warning(fmt.Sprintf("clickhouse write error: %s", err.Error()))
	}
}()

Using the blocking writer interface

// create new writer api: table name with columns
writerBlocking := client.WriterBlocking(cx.View{
    Name:    "clickhouse_database.clickhouse_table",
    Columns: []string{"id", "uuid", "insert_ts"},
})
// non-asynchronous writing of data directly to Clickhouse
err := writerBlocking.WriteRow(ctx, []&MyCustomDataView{
    {
        id: 1, uuid: "1", insertTS: time.Now(),
    },
    {
        id: 2, uuid: "2", insertTS: time.Now(),
    },
    {
        id: 3, uuid: "3", insertTS: time.Now(),
    },
}...)

More

Buffer engine:

You can implement own data-buffer interface: File, Rabbitmq, CustomMemory, etc.

type Buffer interface {
	Write(vec Vector)
	Read() []Vector
	Len() int
	Flush()
}
Retries:

By default, packet resending is disabled, to enable it, you need to call (*Options).SetRetryIsEnabled(true).

  • in-memory use channels (default)
  • redis
  • rabbitMQ
  • kafka

You can implement queue engine by defining the Queueable interface:

type Queueable interface {
	Queue(packet *Packet)
	Retries() <-chan *Packet
}

and set it as an engine:

clickhousebuffer.DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable)
Logs:

You can implement your logger by simply implementing the Logger interface and throwing it in options:

type Logger interface {
	Log(message interface{})
	Logf(format string, v ...interface{})
}
// example with default options
clickhousebuffer.DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger)
Tests:
  • $ go test -v ./... - run all tests without integration part
  • $ go test -race -v ./... - run all tests without integration part and in race detection mode
  • $ golangci-lint run --config ./.golangci.yml - check code quality with linters

Integration Tests:

export CLICKHOUSE_HOST=111.11.11.11:9000
export REDIS_HOST=111.11.11.11:6379
export REDIS_PASS=password_if_needed

$ go test -v ./... -tags=integration

or with race detec mode

$ go test -race -v ./... -tags=integration

Benchmarks

Ubuntu 20.04/Intel Core i7-8750H

goos: linux
goarch: amd64
pkg: github.com/zikwall/clickhouse-buffer/v3/bench
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
// memory

$ go test ./bench -bench=BenchmarkInsertSimplestPreallocateVectors -benchmem -benchtime=1000x

BenchmarkInsertSimplestPreallocateVectors/1000000-12                1000            142919 ns/op               0 B/op         0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/100000-12                 1000             12498 ns/op               0 B/op         0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/10000-12                  1000              1265 ns/op               0 B/op         0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/1000-12                   1000               143.1 ns/op             0 B/op         0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/100-12                    1000                 5.700 ns/op           2 B/op         0 allocs/op

$ go test ./bench -bench=BenchmarkInsertSimplestPreallocateObjects -benchmem -benchtime=1000x

BenchmarkInsertSimplestPreallocateObjects/1000000-12                1000            399110 ns/op           88000 B/op      3000 allocs/op
BenchmarkInsertSimplestPreallocateObjects/100000-12                 1000             37527 ns/op            8800 B/op       300 allocs/op
BenchmarkInsertSimplestPreallocateObjects/10000-12                  1000              3880 ns/op             880 B/op        30 allocs/op
BenchmarkInsertSimplestPreallocateObjects/1000-12                   1000               419.5 ns/op            88 B/op         3 allocs/op
BenchmarkInsertSimplestPreallocateObjects/100-12                    1000                58.90 ns/op           11 B/op         0 allocs/op

$ go test ./bench -bench=BenchmarkInsertSimplestObjects -benchmem -benchtime=1000x

BenchmarkInsertSimplestObjects/1000000-12                   1000            454794 ns/op          160002 B/op       4000 allocs/op
BenchmarkInsertSimplestObjects/100000-12                    1000             41879 ns/op           16000 B/op        400 allocs/op
BenchmarkInsertSimplestObjects/10000-12                     1000              4174 ns/op            1605 B/op         40 allocs/op
BenchmarkInsertSimplestObjects/1000-12                      1000               479.5 ns/op           160 B/op          4 allocs/op
BenchmarkInsertSimplestObjects/100-12                       1000                39.40 ns/op           16 B/op          0 allocs/op

$ go test ./bench -bench=BenchmarkInsertSimplestVectors -benchmem -benchtime=1000x

BenchmarkInsertSimplestVectors/1000000-12                   1000            182548 ns/op           72002 B/op       1000 allocs/op
BenchmarkInsertSimplestVectors/100000-12                    1000             16291 ns/op            7200 B/op        100 allocs/op
BenchmarkInsertSimplestVectors/10000-12                     1000              1638 ns/op             725 B/op         10 allocs/op
BenchmarkInsertSimplestVectors/1000-12                      1000               208.4 ns/op            72 B/op          1 allocs/op
BenchmarkInsertSimplestVectors/100-12                       1000                20.00 ns/op            7 B/op          0 allocs/op

$ go test ./bench -bench=BenchmarkInsertSimplestEmptyVectors -benchmem -benchtime=1000x

BenchmarkInsertSimplestEmptyVectors/1000000-12              1000            132887 ns/op           24002 B/op          0 allocs/op
BenchmarkInsertSimplestEmptyVectors/100000-12               1000             13404 ns/op            2400 B/op          0 allocs/op
BenchmarkInsertSimplestEmptyVectors/10000-12                1000              1299 ns/op             245 B/op          0 allocs/op
BenchmarkInsertSimplestEmptyVectors/1000-12                 1000               122.1 ns/op             0 B/op          0 allocs/op
BenchmarkInsertSimplestEmptyVectors/100-12                  1000                 6.800 ns/op           0 B/op          0 allocs/op

// redis

$ go test ./bench -bench=BenchmarkInsertRedisObjects -benchmem -benchtime=100x

BenchmarkInsertRedisObjects/1000-12                      100          22404356 ns/op           96095 B/op       2322 allocs/op
BenchmarkInsertRedisObjects/100-12                       100           2243544 ns/op            9673 B/op        233 allocs/op
BenchmarkInsertRedisObjects/10-12                        100            271749 ns/op            1033 B/op         25 allocs/op

$ go test ./bench -bench=BenchmarkInsertRedisVectors -benchmem -benchtime=100x

BenchmarkInsertRedisVectors/1000-12                  100          22145258 ns/op           92766 B/op       2274 allocs/op
BenchmarkInsertRedisVectors/100-12                   100           2320692 ns/op            9339 B/op        229 allocs/op
BenchmarkInsertRedisVectors/10-12                    100            202146 ns/op             157 B/op          2 allocs/op

MacBook Pro M1

goos: darwin
goarch: arm64
pkg: github.com/zikwall/clickhouse-buffer/v3/bench
$ akm@MacBook-Pro-andrey clickhouse-buffer % go test ./bench -bench=BenchmarkInsertSimplestPreallocateVectors -benchmem -benchtime=1000x

BenchmarkInsertSimplestPreallocateVectors/1000000-8         	    1000	    206279 ns/op	       0 B/op	       0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/100000-8          	    1000	     24612 ns/op	       0 B/op	       0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/10000-8           	    1000	      2047 ns/op	       0 B/op	       0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/1000-8            	    1000	       204.0 ns/op	       0 B/op	       0 allocs/op
BenchmarkInsertSimplestPreallocateVectors/100-8             	    1000	        22.83 ns/op	       0 B/op	       0 allocs/op

$ akm@MacBook-Pro-andrey clickhouse-buffer % go test ./bench -bench=BenchmarkInsertSimplestPreallocateObjects -benchmem -benchtime=1000x

BenchmarkInsertSimplestPreallocateObjects/1000000-8         	    1000	    410757 ns/op	   88000 B/op	    3000 allocs/op
BenchmarkInsertSimplestPreallocateObjects/100000-8          	    1000	     40885 ns/op	    8800 B/op	     300 allocs/op
BenchmarkInsertSimplestPreallocateObjects/10000-8           	    1000	      4059 ns/op	     880 B/op	      30 allocs/op
BenchmarkInsertSimplestPreallocateObjects/1000-8            	    1000	       407.2 ns/op	      88 B/op	       3 allocs/op
BenchmarkInsertSimplestPreallocateObjects/100-8             	    1000	        46.29 ns/op	      11 B/op	       0 allocs/op

$ akm@MacBook-Pro-andrey clickhouse-buffer % go test ./bench -bench=BenchmarkInsertSimplestObjects -benchmem -benchtime=1000x

BenchmarkInsertSimplestObjects/1000000-8         	    1000	    454083 ns/op	  160002 B/op	    4000 allocs/op
BenchmarkInsertSimplestObjects/100000-8          	    1000	     44329 ns/op	   16000 B/op	     400 allocs/op
BenchmarkInsertSimplestObjects/10000-8           	    1000	      4401 ns/op	    1360 B/op	      40 allocs/op
BenchmarkInsertSimplestObjects/1000-8            	    1000	       437.8 ns/op	     160 B/op	       4 allocs/op
BenchmarkInsertSimplestObjects/100-8             	    1000	        44.71 ns/op	      16 B/op	       0 allocs/op


$ akm@MacBook-Pro-andrey clickhouse-buffer % go test ./bench -bench=BenchmarkInsertSimplestVectors -benchmem -benchtime=1000x

BenchmarkInsertSimplestVectors/1000000-8         	    1000	    244064 ns/op	   72002 B/op	    1000 allocs/op
BenchmarkInsertSimplestVectors/100000-8          	    1000	     24013 ns/op	    7200 B/op	     100 allocs/op
BenchmarkInsertSimplestVectors/10000-8           	    1000	      2335 ns/op	     725 B/op	      10 allocs/op
BenchmarkInsertSimplestVectors/1000-8            	    1000	       240.4 ns/op	      48 B/op	       1 allocs/op
BenchmarkInsertSimplestVectors/100-8             	    1000	        22.17 ns/op	       4 B/op	       0 allocs/op

$ akm@MacBook-Pro-andrey clickhouse-buffer % go test ./bench -bench=BenchmarkInsertSimplestEmptyVectors -benchmem -benchtime=1000x

BenchmarkInsertSimplestEmptyVectors/1000000-8         	    1000	    215240 ns/op	   24002 B/op	       0 allocs/op
BenchmarkInsertSimplestEmptyVectors/100000-8          	    1000	     20736 ns/op	    2400 B/op	       0 allocs/op
BenchmarkInsertSimplestEmptyVectors/10000-8           	    1000	      2109 ns/op	       0 B/op	       0 allocs/op
BenchmarkInsertSimplestEmptyVectors/1000-8            	    1000	       198.3 ns/op	      24 B/op	       0 allocs/op
BenchmarkInsertSimplestEmptyVectors/100-8             	    1000	        19.83 ns/op	       2 B/op	       0 allocs/op

Conclusion:

  • buffer on redis is expected to work slower than the buffer in memory, this is due to fact that it is necessary to serialize data and deserialize it back, which causes a lot of overhead, also do not forget about network overhead
  • writing through a vector causes less overhead (allocations) and works faster
  • pre-allocated vector recording works very fast with zero memory allocation, this is fact that writing to buffer and then writing it to Clickhouse creates almost no overhead
  • the same can be said about recording objects, but there is a small overhead
  • writing vectors is faster, allocates less memory, and is preferable to writing objects
  • using a buffer in-memory is preferable to a buffer in redis

TODO:

  • rewrite Buffer interface, simplify it
  • rewrite Options, simplify it
  • optimization redis buffer and encode/decode functions
  • buffer interfaces
  • more retry buffer interfaces
  • rewrite retry lib, simplify it
  • create binary app for streaming data to clickhouse
    • client and server with HTTP interface
    • client and server with gRPC interface

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Options returns the options associated with client
	Options() *Options
	// WriteBatch method of sending data to Clickhouse is used implicitly in a non - blocking record,
	// and explicitly in a blocking record
	WriteBatch(context.Context, cx.View, *cx.Batch) error
	// Writer returns the asynchronous, non-blocking, Writer client.
	// Ensures using a single Writer instance for each table pair.
	Writer(cx.View, cx.Buffer) Writer
	// WriterBlocking returns the synchronous, blocking, WriterBlocking client.
	// Ensures using a single WriterBlocking instance for each table pair.
	WriterBlocking(cx.View) WriterBlocking
	// RetryClient Get retry client
	RetryClient() retry.Retryable
	// Close ensures all ongoing asynchronous write clients finish.
	Close()
}

Client main interface, provides a top-level API. Client generates child Writer-s and stores all necessary configuration in itself. Client owns an instance of a Clickhouse database connection. Client provides a retry.Retryable interface for re-processing packets.

func NewClient

func NewClient(ctx context.Context, clickhouse cx.Clickhouse) Client

NewClient creates an object implementing the Client interface with default options

func NewClientWithOptions

func NewClientWithOptions(ctx context.Context, clickhouse cx.Clickhouse, options *Options) Client

NewClientWithOptions similar to NewClient except that there is a configuration option with an encapsulated setting inside. NewClientWithOptions returns implementation of the Client interface.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds write configuration properties

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns Options object with default values

func (*Options) BatchSize

func (o *Options) BatchSize() uint

BatchSize returns size of batch

func (*Options) FlushInterval

func (o *Options) FlushInterval() uint

FlushInterval returns flush interval in ms

func (*Options) SetBatchSize

func (o *Options) SetBatchSize(batchSize uint) *Options

SetBatchSize sets number of rows sent in single request

func (*Options) SetDebugMode

func (o *Options) SetDebugMode(isDebug bool) *Options

SetDebugMode set debug mode, for logs and errors

func (*Options) SetFlushInterval

func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options

SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written

func (*Options) SetLogger

func (o *Options) SetLogger(logger cx.Logger) *Options

SetLogger installs a custom implementation of the cx.Logger interface

func (*Options) SetQueueEngine

func (o *Options) SetQueueEngine(queue retry.Queueable) *Options

SetQueueEngine installs a custom implementation of the retry.Queueable interface

func (*Options) SetRetryIsEnabled

func (o *Options) SetRetryIsEnabled(enabled bool) *Options

SetRetryIsEnabled enable/disable resending undelivered messages

type Writer

type Writer interface {
	// WriteRow writes asynchronously line protocol record into bucket.
	WriteRow(vector cx.Vectorable)
	// TryWriteRow same as WriteRow, but with Channel Closing Principle (Gracefully Close Channels)
	TryWriteRow(vec cx.Vectorable)
	// WriteVector writes asynchronously line protocol record into bucket.
	WriteVector(vec cx.Vector)
	// TryWriteVector same as WriteVector
	TryWriteVector(vec cx.Vector)
	// Errors returns a channel for reading errors which occurs during async writes.
	Errors() <-chan error
	// Close writer
	Close()
}

Writer is client interface with non-blocking methods for writing rows asynchronously in batches into an Clickhouse server. Writer can be used concurrently. When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.

func NewWriter

func NewWriter(ctx context.Context, client Client, view cx.View, engine cx.Buffer) Writer

NewWriter returns new non-blocking write client for writing rows to Clickhouse table

type WriterBlocking

type WriterBlocking interface {
	// WriteRow writes row(s) into bucket.
	// WriteRow writes without implicit batching. Batch is created from given number of records
	// Non-blocking alternative is available in the Writer interface
	WriteRow(ctx context.Context, row ...cx.Vectorable) error
}

WriterBlocking similar to Writer except that this interface must implement a blocking entry. WriterBlocking do not worry about errors and repeated entries of undelivered messages, all responsibility for error handling falls on developer

func NewWriterBlocking

func NewWriterBlocking(client Client, view cx.View) WriterBlocking

NewWriterBlocking WriterBlocking object

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL