questdb

package module
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: Apache-2.0 Imports: 24 Imported by: 1

README

GoDoc reference

go-questdb-client

Golang client for QuestDB's Influx Line Protocol (ILP) over HTTP and TCP. This library makes it easy to insert data into QuestDB.

The library requires Go 1.19 or newer.

Features:

  • Context-aware API.
  • Optimized for batch writes.
  • Supports TLS encryption and ILP authentication.
  • Automatic write retries and connection reuse for ILP over HTTP.
  • Tested against QuestDB 7.3.10 and newer versions.

New in v3:

  • Supports ILP over HTTP using the same client semantics

Documentation is available here.

Quickstart

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	qdb "github.com/questdb/go-questdb-client/v3"
)

func main() {
	ctx := context.TODO()
	// Connect to QuestDB running locally.
	sender, err := qdb.LineSenderFromConf(ctx, "http::addr=localhost:9000;")
	if err != nil {
		log.Fatal(err)
	}
	// Make sure to close the sender on exit to release resources.
	defer sender.Close(ctx)

	// Send a few ILP messages.
	tradedTs, err := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z")
	if err != nil {
		log.Fatal(err)
	}
	err = sender.
		Table("trades_go").
		Symbol("pair", "USDGBP").
		Symbol("type", "buy").
		Float64Column("traded_price", 0.83).
		Float64Column("limit_price", 0.84).
		Int64Column("qty", 100).
		At(ctx, tradedTs)
	if err != nil {
		log.Fatal(err)
	}

	tradedTs, err = time.Parse(time.RFC3339, "2022-08-06T15:04:06.987654Z")
	if err != nil {
		log.Fatal(err)
	}
	err = sender.
		Table("trades_go").
		Symbol("pair", "GBPJPY").
		Symbol("type", "sell").
		Float64Column("traded_price", 135.97).
		Float64Column("limit_price", 0.84).
		Int64Column("qty", 400).
		At(ctx, tradedTs)
	if err != nil {
		log.Fatal(err)
	}

	// Make sure that the messages are sent over the network.
	err = sender.Flush(ctx)
	if err != nil {
		log.Fatal(err)
	}
}

To connect via TCP, set the configuration string to:

	// ...
	sender, err := qdb.LineSenderFromConf(ctx, "tcp::addr=localhost:9009;")
	// ...

Pooled Line Senders

Warning: Experimental feature designed for use with HTTP senders ONLY

Version 3 of the client introduces a LineSenderPool, which provides a mechanism to pool previously-used LineSenders so they can be reused without having to allocate and instantiate new senders.

A LineSenderPool is thread-safe and can be used to concurrently obtain senders across multiple goroutines.

Since LineSenders must be used in a single-threaded context, a typical pattern is to Acquire a sender from a LineSenderPool at the beginning of a goroutine and use a deferred execution block to Close the sender at the end of the goroutine.

Here is an example of the LineSenderPool Acquire, Release, and Close semantics:

package main

import (
	"context"

	qdb "github.com/questdb/go-questdb-client/v3"
)

func main() {
	ctx := context.TODO()

	pool := qdb.PoolFromConf("http::addr=localhost:9000")
	defer func() {
		err := pool.Close(ctx)
		if err != nil {
			panic(err)
		}
	}()

	sender, err := pool.Sender(ctx)
	if err != nil {
		panic(err)
	}

	sender.Table("prices").
		Symbol("ticker", "AAPL").
		Float64Column("price", 123.45).
		AtNow(ctx)

	// Close call returns the sender back to the pool
	if err := sender.Close(ctx); err != nil {
		panic(err)
	}
}

Migration from v2

v2 code example:

package main

import (
	"context"

	qdb "github.com/questdb/go-questdb-client/v2"
)

func main() {
	// Connect to QuestDB running on 127.0.0.1:9009 (ILP/TCP)
	sender, err := qdb.NewLineSender(context.TODO())
	// ...
	defer sender.Close()
	// ...
}

Migrated v3 code:

package main

import (
	"context"

	qdb "github.com/questdb/go-questdb-client/v3"
)

func main() {
	// Connect to QuestDB running on 127.0.0.1:9000 (ILP/HTTP)
	sender, err := qdb.NewLineSender(context.TODO(), qdb.WithHTTP())
	// Alternatively, you can use the LineSenderFromConf function:
	// sender, err := qdb.LineSenderFromConf(ctx, "http::addr=localhost:9000;")
	// ...
	// or you can export the "http::addr=localhost:9000;" config string to
	// the QDB_CLIENT_CONF environment variable and use the LineSenderFromEnv function:
	// sender, err := qdb.LineSenderFromEnv(ctx)
	// ...
	defer sender.Close(context.TODO())
	// ...
}

Note that the migrated code uses the HTTP sender instead of the TCP one.

Community

If you need help, have additional questions or want to provide feedback, you may find in our Community Forum. You can also sign up to our mailing list to get notified of new releases.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HttpError

type HttpError struct {
	Code    string `json:"code"`
	Message string `json:"message"`
	Line    int    `json:"line,omitempty"`
	ErrorId string `json:"errorId"`
	// contains filtered or unexported fields
}

HttpError is a server-sent error message.

func (*HttpError) Error

func (e *HttpError) Error() string

Error returns full error message string.

func (*HttpError) HttpStatus

func (e *HttpError) HttpStatus() int

HttpStatus returns error HTTP status code.

type InvalidConfigStrError

type InvalidConfigStrError struct {
	// contains filtered or unexported fields
}

InvalidConfigStrError is error indicating invalid config string.

func NewInvalidConfigStrError

func NewInvalidConfigStrError(msg string, args ...interface{}) *InvalidConfigStrError

NewInvalidConfigStrError creates new InvalidConfigStrError.

func (*InvalidConfigStrError) Error

func (e *InvalidConfigStrError) Error() string

Error returns full error message string.

type LineSender

type LineSender interface {
	// Table sets the table name (metric) for a new ILP message. Should be
	// called before any Symbol or Column method.
	//
	// Table name cannot contain any of the following characters:
	// '\n', '\r', '?', ',', ”', '"', '\', '/', ':', ')', '(', '+', '*',
	// '%', '~', starting '.', trailing '.', or a non-printable char.
	Table(name string) LineSender

	// Symbol adds a symbol column value to the ILP message. Should be called
	// before any Column method.
	//
	// Symbol name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Symbol(name, val string) LineSender

	// Int64Column adds a 64-bit integer (long) column value to the ILP
	// message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Int64Column(name string, val int64) LineSender

	// Long256Column adds a 256-bit unsigned integer (long256) column
	// value to the ILP message.
	//
	// Only non-negative numbers that fit into 256-bit unsigned integer are
	// supported and any other input value would lead to an error.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Long256Column(name string, val *big.Int) LineSender

	// TimestampColumn adds a timestamp column value to the ILP
	// message.
	//
	// Should be used only for non-designated timestamp column.
	// Designated timestamp column values should be passed to At/AtNow.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	TimestampColumn(name string, ts time.Time) LineSender

	// Float64Column adds a 64-bit float (double) column value to the ILP
	// message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Float64Column(name string, val float64) LineSender

	// StringColumn adds a string column value to the ILP message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	StringColumn(name, val string) LineSender

	// BoolColumn adds a boolean column value to the ILP message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	BoolColumn(name string, val bool) LineSender

	// At sets the designated timestamp value and finalizes the ILP
	// message.
	//
	// If the underlying buffer reaches configured capacity or the
	// number of buffered messages exceeds the auto-flush trigger, this
	// method also sends the accumulated messages.
	//
	// If ts.IsZero(), no timestamp is sent to the server.
	At(ctx context.Context, ts time.Time) error

	// AtNow omits designated timestamp value and finalizes the ILP
	// message. The server will insert each message using the system
	// clock as the row timestamp.
	//
	// If the underlying buffer reaches configured capacity or the
	// number of buffered messages exceeds the auto-flush trigger, this
	// method also sends the accumulated messages.
	AtNow(ctx context.Context) error

	// Flush sends the accumulated messages via the underlying
	// connection. Should be called periodically to make sure that
	// all messages are sent to the server.
	//
	// For optimal performance, this method should not be called after
	// each ILP message. Instead, the messages should be written in
	// batches followed by a Flush call. The optimal batch size may
	// vary from one thousand to few thousand messages depending on
	// the message size.
	Flush(ctx context.Context) error

	// Close closes the underlying HTTP client.
	//
	// If auto-flush is enabled, the client will flush any remaining buffered
	// messages before closing itself.
	Close(ctx context.Context) error
}

LineSender allows you to insert rows into QuestDB by sending ILP messages over HTTP or TCP protocol.

Each sender corresponds to a single client-server connection. A sender should not be called concurrently by multiple goroutines.

HTTP senders reuse connections from a global pool by default. You can customize the HTTP transport by passing a custom http.Transport to the WithHttpTransport option.

func LineSenderFromConf

func LineSenderFromConf(ctx context.Context, conf string) (LineSender, error)

LineSenderFromConf creates a LineSender using the QuestDB config string format.

Example config string: "http::addr=localhost;username=joe;password=123;auto_flush_rows=1000;"

QuestDB ILP clients use a common key-value configuration string format across all implementations. We opted for this config over a URL because it reduces the amount of character escaping required for paths and base64-encoded param values.

The config string format is as follows:

schema::key1=value1;key2=value2;key3=value3;

Schemas supported are "http", "https", "tcp", "tcps"

Options: http(s) and tcp(s):

-------------------

addr: hostname/port of QuestDB endpoint

init_buf_size: initial growable ILP buffer size in bytes (defaults to 128KiB)

tls_verify: determines if TLS certificates should be validated (defaults to "on", can be set to "unsafe_off")

http(s)-only

------------

username: for basic authentication

password: for basic authentication

token: bearer token auth (used instead of basic authentication)

auto_flush: determines if auto-flushing is enabled (values "on" or "off", defaults to "on")

auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on. Set to 'off' to disable.

auto_flush_interval: auto-flushing is triggered above this time, in milliseconds (defaults to 1000 milliseconds). If set, explicitly implies auto_flush=on. Set to 'off' to disable.

request_min_throughput: bytes per second, used to calculate each request's timeout (defaults to 100KiB/s)

request_timeout: minimum request timeout in milliseconds (defaults to 10 seconds)

retry_timeout: cumulative maximum millisecond duration spent in retries (defaults to 10 seconds)

max_buf_size: buffer growth limit in bytes. Client errors if breached (default is 100MiB)

tcp(s)-only

-----------

username: KID (key ID) for ECDSA authentication

token: Secret K (D) for ECDSA authentication

func LineSenderFromEnv added in v3.0.4

func LineSenderFromEnv(ctx context.Context) (LineSender, error)

LineSenderFromEnv creates a LineSender with a config string defined by the QDB_CLIENT_CONF environment variable. See LineSenderFromConf for the config string format.

This is a convenience method suitable for Cloud environments.

func NewLineSender

func NewLineSender(ctx context.Context, opts ...LineSenderOption) (LineSender, error)

NewLineSender creates new InfluxDB Line Protocol (ILP) sender. Each sender corresponds to a single client connection. LineSender should not be called concurrently by multiple goroutines.

type LineSenderOption

type LineSenderOption func(*lineSenderConfig)

LineSenderOption defines line sender config option.

func WithAddress

func WithAddress(addr string) LineSenderOption

WithAddress sets address to connect to. Should be in the "host:port" format. Defaults to "127.0.0.1:9000" in case of HTTP and "127.0.0.1:9009" in case of TCP.

func WithAuth

func WithAuth(tokenId, token string) LineSenderOption

WithAuth sets token (private key) used for ILP authentication.

Only available for the TCP sender.

func WithAutoFlushDisabled

func WithAutoFlushDisabled() LineSenderOption

WithAutoFlushDisabled turns off auto-flushing behavior. To send ILP messages, the user must call Flush().

Only available for the HTTP sender.

func WithAutoFlushInterval

func WithAutoFlushInterval(interval time.Duration) LineSenderOption

WithAutoFlushInterval the interval at which the Sender automatically flushes its buffer. Defaults to 1 second.

Only available for the HTTP sender.

func WithAutoFlushRows

func WithAutoFlushRows(rows int) LineSenderOption

WithAutoFlushRows sets the number of buffered rows that must be breached in order to trigger an auto-flush. Defaults to 75000.

Only available for the HTTP sender.

func WithBasicAuth

func WithBasicAuth(user, pass string) LineSenderOption

WithBasicAuth sets a Basic authentication header for ILP requests over HTTP.

Only available for the HTTP sender.

func WithBearerToken

func WithBearerToken(token string) LineSenderOption

WithBearerToken sets a Bearer token Authentication header for ILP requests.

Only available for the HTTP sender.

func WithFileNameLimit

func WithFileNameLimit(limit int) LineSenderOption

WithFileNameLimit sets maximum file name length in chars allowed by the server. Affects maximum table and column name lengths accepted by the sender. Should be set to the same value as on the server. Defaults to 127.

func WithHttp

func WithHttp() LineSenderOption

WithHttp enables ingestion over HTTP protocol.

func WithHttpTransport

func WithHttpTransport(t *http.Transport) LineSenderOption

WithHttpTransport sets the client's http transport to the passed pointer instead of the global transport. This can be used for customizing the http transport used by the LineSender. For example to set custom timeouts, TLS settings, etc. WithTlsInsecureSkipVerify is ignored when this option is in use.

Only available for the HTTP sender.

func WithInitBufferSize

func WithInitBufferSize(sizeInBytes int) LineSenderOption

WithInitBufferSize sets the desired initial buffer capacity in bytes to be used when sending ILP messages. Defaults to 128KB.

This setting is a soft limit, i.e. the underlying buffer may grow larger than the provided value.

func WithMaxBufferSize

func WithMaxBufferSize(sizeInBytes int) LineSenderOption

WithMaxBufferSize sets the maximum buffer capacity in bytes to be used when sending ILP messages. The sender will return an error if the limit is reached. Defaults to 100MB.

Only available for the HTTP sender.

func WithMinThroughput

func WithMinThroughput(bytesPerSecond int) LineSenderOption

WithMinThroughput is used in combination with request_timeout to set the timeout of an ILP request. Defaults to 100KiB/s.

timeout = (request.len() / request_min_throughput) + request_timeout

Only available for the HTTP sender.

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) LineSenderOption

WithRequestTimeout is used in combination with request_min_throughput to set the timeout of an ILP request. Defaults to 10 seconds.

timeout = (request.len() / request_min_throughput) + request_timeout

Only available for the HTTP sender.

func WithRetryTimeout

func WithRetryTimeout(t time.Duration) LineSenderOption

WithRetryTimeout is the cumulative maximum duration spend in retries. Defaults to 10 seconds. Retries work great when used in combination with server-side data deduplication.

Only network-related errors and certain 5xx response codes are retryable.

Only available for the HTTP sender.

func WithTcp

func WithTcp() LineSenderOption

WithTcp enables ingestion over TCP protocol.

func WithTls

func WithTls() LineSenderOption

WithTls enables TLS connection encryption.

func WithTlsInsecureSkipVerify

func WithTlsInsecureSkipVerify() LineSenderOption

WithTlsInsecureSkipVerify enables TLS connection encryption, but skips server certificate verification. Useful in test environments with self-signed certificates. Do not use in production environments.

type LineSenderPool added in v3.1.0

type LineSenderPool struct {
	// contains filtered or unexported fields
}

LineSenderPool wraps a mutex-protected slice of LineSender. It allows a goroutine to Acquire a sender from the pool and Release it back to the pool when it's done being used.

WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.

func PoolFromConf added in v3.1.0

func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error)

PoolFromConf instantiates a new LineSenderPool with a QuestDB configuration string. Any sender acquired from this pool will be initialized with the same configuration string that was passed into the conf argument.

The default maximum number of senders is 64, but can be customized by using the WithMaxSenders option.

func PoolFromOptions added in v3.2.0

func PoolFromOptions(opts ...LineSenderOption) (*LineSenderPool, error)

PoolFromOptions instantiates a new LineSenderPool using programmatic options. Any sender acquired from this pool will be initialized with the same options that were passed into the opts argument.

Unlike PoolFromConf, PoolFromOptions does not have the ability to customize the returned LineSenderPool. In this case, to add options (such as WithMaxSenders), you need manually apply these options after calling this method.

// Create a PoolFromOptions with LineSender options
p, err := PoolFromOptions(
	WithHttp(),
	WithAutoFlushRows(1000000),
)

if err != nil {
	panic(err)
}

// Add Pool-level options manually
WithMaxSenders(32)(p)

func (*LineSenderPool) Close added in v3.1.0

func (p *LineSenderPool) Close(ctx context.Context) error

Close sets the pool's status to "closed" and closes all cached LineSenders. When LineSenders are released back into a closed pool, they will be closed and discarded.

func (*LineSenderPool) IsClosed added in v3.1.0

func (p *LineSenderPool) IsClosed() bool

IsClosed will return true if the pool is closed. Once a pool is closed, you will not be able to Acquire any new LineSenders from it. When LineSenders are released back into a closed pool, they will be closed and discarded.

func (*LineSenderPool) Len added in v3.1.0

func (p *LineSenderPool) Len() int

Len returns the number of LineSenders in the pool.

func (*LineSenderPool) Sender added in v3.2.0

func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error)

Sender obtains a LineSender from the pool. If the pool is empty, a new LineSender will be instantiated using the pool's config string. If there is already maximum number of senders obtained from the pool, this call will block until one of the senders is returned back to the pool by calling sender.Close().

type LineSenderPoolOption added in v3.1.0

type LineSenderPoolOption func(*LineSenderPool)

LineSenderPoolOption defines line sender pool config option.

func WithMaxSenders added in v3.1.0

func WithMaxSenders(count int) LineSenderPoolOption

WithMaxSenders sets the maximum number of senders in the pool. The default maximum number of senders is 64.

type RetryTimeoutError

type RetryTimeoutError struct {
	LastErr error
	Timeout time.Duration
}

RetryTimeoutError is error indicating failed flush retry attempt.

func NewRetryTimeoutError

func NewRetryTimeoutError(timeout time.Duration, lastError error) *RetryTimeoutError

NewRetryTimeoutError returns a new RetryTimeoutError error.

func (*RetryTimeoutError) Error

func (e *RetryTimeoutError) Error() string

Error returns full error message string.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL