clickhousebuffer

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2022 License: Apache-2.0 Imports: 14 Imported by: 6

README

build build

clickhouse-buffer

Buffer for streaming data to ClickHouse

Install

  • $ go get -u github.com/zikwall/clickhouse-buffer

Why and why

In the practice of using the Clickhouse database (in real projects), you often have to resort to creating your own bicycles in the form of queues and testers that accumulate the necessary amount of data or for a certain period of time and send one large data package to the Clickhouse database.

This is due to the fact that Clickhouse is designed so that it better processes new data in batches (and this is recommended by the authors themselves).

Features

Client offers two ways of writing:
  • non-blocking
  • blocking.

Non-blocking write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 5000, or the flush interval, default 1s, times out.

Asynchronous write client is recommended for frequent periodic writes.

Client buffer interfaces
  • in-memory
  • redis
Writes are automatically retried on server back pressure

There is also the possibility of resending "broken" or for some reason not sent packets. By default, packet resending is disabled, to enable it, you need to call (*Options).SetRetryIsEnabled(true).

// example with default options
DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true)
  • in-memory queue by default
  • Redis
  • RabbitMQ
  • Kafka

You can implement queue engine by defining the Queueable interface:

type Queueable interface {
	Queue(packet *retryPacket)
	Retries() <-chan *retryPacket
}

and set it as an engine:

DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable)

Usage

First you need to implement the Inline interface, and your own Row structure for formatting the data

// implement
type MyTableRow struct {
	id       int
	uuid     string
	insertTS time.Time
}

func (t *MyTableRow) Row() RowSlice {
	return RowSlice{t.id, t.uuid, t.insertTS.Format(time.RFC822)}
}

Next, you need to define the Clickhouse interface, you can define your own component or use an existing implementation.

You can use two methods:

  • create a connection to the Clickhouse database from the connection parameters,
ch, err := clickhousebuffer.NewClickhouseWithOptions(ctx,
    &clickhousebuffer.ClickhouseCfg{
        Address:  ctx.String("clickhouse-address"),
        User:     ctx.String("clickhouse-user"),
        Password: ctx.String("clickhouse-password"),
        Database: ctx.String("clickhouse-database"),
        AltHosts: ctx.String("clickhouse-alt-hosts"),
        IsDebug:  ctx.Bool("debug"),
    },
    clickhousebuffer.WithMaxIdleConns(20),
    clickhousebuffer.WithMaxOpenConns(21),
    clickhousebuffer.WithConnMaxLifetime(time.Minute*5),
)
  • use an existing connection pool by providing sqlx.DB
clickhouse, _ := clikchousebuffer.NewClickhouseWithSqlx(conn *sqlx.DB)
Create main data streamer client and write data
client := NewClientWithOptions(ctx, clickhouseConn,
    clikchousebuffer.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000),
)

You can implement your own data buffer interface: File, Rabbitmq, CustomMemory, etc. or use an existing one.

type Buffer interface {
	Write(vector RowSlice)
	Read() []RowSlice
	Len() int
	Flush()
}

Only the in-memory and redis buffer is currently available

// use buffer implement interface
buffer := memory.NewBuffer(
	client.Options().BatchSize(),
)
buffer := redis.NewBuffer(
	contetx, *redis.Client, "bucket", client.Options().BatchSize(),
)

Now we can write data to the necessary tables in an asynchronous, non-blocking way

writeAPI := client.Writer(View{
    Name:    "clickhouse_database.clickhouse_table", 
    Columns: []string{"id", "uuid", "insert_ts"},
}, buffer)

// write your data
writeAPI.WriteRow(MyTableRow{
    id: 1, uuid: "1", insertTS: time.Now(),
})

When using a non-blocking record, you can track errors through a special error channel

errorsCh := writeAPI.Errors()
go func() {
	for err := range errorsCh {
		log.Warning(fmt.Sprintf("clickhouse write error: %s", err.Error()))
	}
}()

Using the blocking writer interface

writerBlocking := client.WriterBlocking(View{
    Name:    "clickhouse_database.clickhouse_table",
    Columns: []string{"id", "uuid", "insert_ts"},
})

err := writerBlocking.WriteRow(ctx, []Inline{
    {
        id: 1, uuid: "1", insertTS: time.Now(),
    },
    {
        id: 2, uuid: "2", insertTS: time.Now(),
    },
    {
        id: 3, uuid: "3", insertTS: time.Now(),
    },
}...)
Logs

You can implement your logger by simply implementing the Logger interface and throwing it in options:

type Logger interface {
	Log(message interface{})
	Logf(format string, v ...interface{})
}
// example with default options
DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger)
TODO: log levels: info, warning, error, fatal
Tests
  • $ go test -v ./...
  • $ go test -v ./... -tags=integration
  • $ golangci-lint run --config ./.golangci.yml

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Clickhouse added in v0.0.2

type Clickhouse interface {
	Insert(context.Context, View, []buffer.RowSlice) (uint64, error)
	GetConnection() *sqlx.DB
	Close() error
}

func NewClickhouseWithOptions added in v0.0.2

func NewClickhouseWithOptions(ctx context.Context, cfg *ClickhouseCfg, options ...Option) (Clickhouse, error)

func NewClickhouseWithSqlx added in v0.0.2

func NewClickhouseWithSqlx(conn *sqlx.DB) (Clickhouse, error)

type ClickhouseCfg added in v0.0.2

type ClickhouseCfg struct {
	Address  string
	Password string
	User     string
	Database string
	AltHosts string
	IsDebug  bool
}

type ClickhouseImpl added in v0.0.2

type ClickhouseImpl struct {
	// contains filtered or unexported fields
}

func (*ClickhouseImpl) Close added in v0.0.2

func (ci *ClickhouseImpl) Close() error

func (*ClickhouseImpl) GetConnection added in v0.0.4

func (ci *ClickhouseImpl) GetConnection() *sqlx.DB

func (*ClickhouseImpl) Insert added in v0.0.2

func (ci *ClickhouseImpl) Insert(ctx context.Context, view View, rows []buffer.RowSlice) (uint64, error)

Insert Currently, the client library does not support the JSONEachRow format, only native byte blocks There is no support for user interfaces as well as simple execution of an already prepared request The entire batch bid is implemented through so-called "transactions", although Clickhouse does not support them - it is only a client solution for preparing requests

func (*ClickhouseImpl) SetInsertTimeout added in v0.0.2

func (ci *ClickhouseImpl) SetInsertTimeout(timeout uint)

type ClickhouseOpt added in v0.0.4

type ClickhouseOpt struct {
	// contains filtered or unexported fields
}

type Client added in v0.0.2

type Client interface {
	// Options returns the options associated with client
	Options() *Options
	// HandleStream method for processing data x and sending it to Clickhouse
	HandleStream(View, *buffer.Batch) error
	// WriteBatch method of sending data to Clickhouse is used implicitly in a non - blocking record,
	// and explicitly in a blocking record
	WriteBatch(context.Context, View, *buffer.Batch) error
	// Writer returns the asynchronous, non-blocking, Writer client.
	// Ensures using a single Writer instance for each table pair.
	Writer(View, buffer.Buffer) Writer
	// WriterBlocking returns the synchronous, blocking, WriterBlocking client.
	// Ensures using a single WriterBlocking instance for each table pair.
	WriterBlocking(View) WriterBlocking
	// RetryClient Get retry client
	RetryClient() Retryable
	// Close ensures all ongoing asynchronous write clients finish.
	Close()
}

func NewClient

func NewClient(ctx context.Context, clickhouse Clickhouse) Client

func NewClientWithOptions

func NewClientWithOptions(ctx context.Context, clickhouse Clickhouse, options *Options) Client

type Closable added in v0.0.5

type Closable interface {
	Close() error
	CloseMessage() string
}

type Countable added in v0.0.5

type Countable interface {
	Inc() uint64
	Dec() uint64
	Val() uint64
}

type Logger added in v0.0.5

type Logger interface {
	Log(message interface{})
	Logf(format string, v ...interface{})
}

type Option added in v0.0.4

type Option func(e *ClickhouseOpt)

func WithConnMaxLifetime added in v0.0.4

func WithConnMaxLifetime(connMaxLifetime time.Duration) Option

WithConnMaxLifetime set `maxIdleConns` to ClickhouseOpt

func WithMaxIdleConns added in v0.0.4

func WithMaxIdleConns(maxIdleConns int) Option

WithMaxIdleConns set `maxIdleConns` to ClickhouseOpt

func WithMaxOpenConns added in v0.0.4

func WithMaxOpenConns(maxOpenConns int) Option

WithMaxOpenConns set `maxOpenConns` to ClickhouseOpt

type Options added in v0.0.2

type Options struct {
	// contains filtered or unexported fields
}

Options holds write configuration properties

func DefaultOptions added in v0.0.2

func DefaultOptions() *Options

DefaultOptions returns Options object with default values

func (*Options) BatchSize added in v0.0.2

func (o *Options) BatchSize() uint

BatchSize returns size of batch

func (*Options) FlushInterval added in v0.0.2

func (o *Options) FlushInterval() uint

FlushInterval returns flush interval in ms

func (*Options) SetBatchSize added in v0.0.2

func (o *Options) SetBatchSize(batchSize uint) *Options

SetBatchSize sets number of rows sent in single request

func (*Options) SetDebugMode added in v0.0.5

func (o *Options) SetDebugMode(isDebug bool) *Options

func (*Options) SetFlushInterval added in v0.0.2

func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options

SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written

func (*Options) SetLogger added in v0.0.5

func (o *Options) SetLogger(logger Logger) *Options

func (*Options) SetQueueEngine added in v0.0.5

func (o *Options) SetQueueEngine(queue Queueable) *Options

func (*Options) SetRetryIsEnabled added in v0.0.5

func (o *Options) SetRetryIsEnabled(enabled bool) *Options

type Queueable added in v0.0.5

type Queueable interface {
	Queue(packet *retryPacket)
	Retries() <-chan *retryPacket
}

type Retryable added in v0.0.5

type Retryable interface {
	Retry(packet *retryPacket)
	Metrics() (uint64, uint64, uint64)
}

func NewRetry added in v0.0.5

func NewRetry(ctx context.Context, engine Queueable, writer Writeable, logger Logger, isDebug bool) Retryable

type View added in v0.0.2

type View struct {
	Name    string
	Columns []string
}

type Writeable added in v0.0.5

type Writeable interface {
	Write(ctx context.Context, view View, batch *buffer.Batch) (uint64, error)
}

func NewDefaultWriter added in v0.0.5

func NewDefaultWriter(conn Clickhouse) Writeable

type Writer added in v0.0.2

type Writer interface {
	// WriteRow writes asynchronously line protocol record into bucket.
	WriteRow(vector buffer.Inline)
	// Flush forces all pending writes from the buffer to be sent
	Flush()
	// Errors returns a channel for reading errors which occurs during async writes.
	Errors() <-chan error
	// Close writer
	Close()
}

Writer is client interface with non-blocking methods for writing rows asynchronously in batches into an Clickhouse server. Writer can be used concurrently. When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.

func NewWriter added in v0.0.2

func NewWriter(client Client, view View, buf buffer.Buffer, writeOptions *Options) Writer

NewWriter returns new non-blocking write client for writing rows to Clickhouse table

type WriterBlocking added in v0.0.2

type WriterBlocking interface {
	// WriteRow writes row(s) into bucket.
	// WriteRow writes without implicit batching. Batch is created from given number of records
	// Non-blocking alternative is available in the Writer interface
	WriteRow(ctx context.Context, row ...buffer.Inline) error
}

func NewWriterBlocking added in v0.0.2

func NewWriterBlocking(streamer Client, view View) WriterBlocking

type WriterBlockingImpl added in v0.0.2

type WriterBlockingImpl struct {
	// contains filtered or unexported fields
}

func (*WriterBlockingImpl) WriteRow added in v0.0.2

func (w *WriterBlockingImpl) WriteRow(ctx context.Context, row ...buffer.Inline) error

type WriterImpl added in v0.0.2

type WriterImpl struct {
	// contains filtered or unexported fields
}

func (*WriterImpl) Close added in v0.0.2

func (w *WriterImpl) Close()

Close finishes outstanding write operations, stop background routines and closes all channels

func (*WriterImpl) Errors added in v0.0.2

func (w *WriterImpl) Errors() <-chan error

Errors returns a channel for reading errors which occurs during async writes. Must be called before performing any writes for errors to be collected. The chan is unbuffered and must be drained or the writer will block.

func (*WriterImpl) Flush added in v0.0.2

func (w *WriterImpl) Flush()

Flush forces all pending writes from the buffer to be sent

func (*WriterImpl) WriteRow added in v0.0.2

func (w *WriterImpl) WriteRow(rower buffer.Inline)

WriteRow writes asynchronously line protocol record into bucket. WriteRow adds record into the buffer which is sent on the background when it reaches the batch size.

Directories

Path Synopsis
src

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL