questdb

package module
v4.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2025 License: Apache-2.0 Imports: 26 Imported by: 0

README

GoDoc reference

go-questdb-client

Golang client for QuestDB's Influx Line Protocol (ILP) over HTTP and TCP. This library makes it easy to insert data into QuestDB.

The library requires Go 1.19 or newer.

Features:

  • Context-aware API.
  • Optimized for batch writes.
  • Supports TLS encryption and ILP authentication.
  • Automatic write retries and connection reuse for ILP over HTTP.
  • Tested against QuestDB 7.3.10 and newer versions.

New in v4:

  • Supports n-dimensional arrays of doubles for QuestDB servers 9.0.0 and up

Documentation is available here.

Quickstart

package main

import (
	"context"
	"log"
	"time"

	qdb "github.com/questdb/go-questdb-client/v4"
)

func main() {
	ctx := context.TODO()
	// Connect to QuestDB running locally.
	sender, err := qdb.LineSenderFromConf(ctx, "http::addr=localhost:9000;")
	if err != nil {
		log.Fatal(err)
	}
	// Make sure to close the sender on exit to release resources.
	defer sender.Close(ctx)
	// Send a few ILP messages.
	err = sender.
		Table("trades").
		Symbol("symbol", "ETH-USD").
		Symbol("side", "sell").
		Float64Column("price", 2615.54).
		Float64Column("amount", 0.00044).
		AtNow(ctx) // timestamp will be set at the server side

	tradedTs, err := time.Parse(time.RFC3339, "2022-08-06T15:04:05.123456Z")
	if err != nil {
		log.Fatal(err)
	}

	// You can pass a timestamp, rather than using the AtNow call
	err = sender.
		Table("trades").
		Symbol("symbol", "BTC-USD").
		Symbol("side", "sell").
		Float64Column("price", 39269.98).
		Float64Column("amount", 0.001).
		At(ctx, tradedTs)
	if err != nil {
		log.Fatal(err)
	}

	tradedTs, err = time.Parse(time.RFC3339, "2022-08-06T15:04:06.987654Z")
	if err != nil {
		log.Fatal(err)
	}
	err = sender.
		Table("trades_go").
		Symbol("pair", "GBPJPY").
		Symbol("type", "sell").
		Float64Column("traded_price", 135.97).
		Float64Column("limit_price", 0.84).
		Int64Column("qty", 400).
		At(ctx, tradedTs)
	if err != nil {
		log.Fatal(err)
	}

	// Make sure that the messages are sent over the network.
	err = sender.Flush(ctx)
	if err != nil {
		log.Fatal(err)
	}
}

HTTP is the recommended transport to use. To connect via TCP, set the configuration string to:

	// ...
	sender, err := qdb.LineSenderFromConf(ctx, "tcp::addr=localhost:9009;")
	// ...

N-dimensional arrays

QuestDB server version 9.0.0 and newer supports n-dimensional arrays of double precision floating point numbers. The Go client provides several methods to send arrays to QuestDB:

1D Arrays
// Send a 1D array of doubles
values1D := []float64{1.1, 2.2, 3.3, 4.4}
err = sender.
    Table("measurements").
    Symbol("sensor", "temp_probe_1").
    Float64Array1DColumn("readings", values1D).
    AtNow(ctx)
2D Arrays
// Send a 2D array of doubles (must be rectangular)
values2D := [][]float64{
    {1.1, 2.2, 3.3},
    {4.4, 5.5, 6.6},
    {7.7, 8.8, 9.9},
}
err = sender.
    Table("matrix_data").
    Symbol("experiment", "test_001").
    Float64Array2DColumn("matrix", values2D).
    AtNow(ctx)
3D Arrays
// Send a 3D array of doubles (must be regular cuboid shape)
values3D := [][][]float64{
    {{1.0, 2.0}, {3.0, 4.0}},
    {{5.0, 6.0}, {7.0, 8.0}},
}
err = sender.
    Table("tensor_data").
    Symbol("model", "neural_net_v1").
    Float64Array3DColumn("weights", values3D).
    AtNow(ctx)
N-dimensional Arrays

For higher dimensions, use the NewNDArray function:

// Create a 2x3x4 array
arr, err := qdb.NewNDArray[float64](2, 3, 4)
if err != nil {
    log.Fatal(err)
}

// Fill with values
arr.Fill(1.5)

// Or set individual values
arr.Set([]uint{0, 1, 2}, 42.0)

err = sender.
    Table("ndarray_data").
    Symbol("dataset", "training_batch_1").
    Float64ArrayNDColumn("features", arr).
    AtNow(ctx)

The array data is sent over a new protocol version (2) that is auto-negotiated when using HTTP(s), or can be specified explicitly via the protocol_version=2 parameter when using TCP(s).

We recommend using HTTP(s), but here is an TCP example, should you need it:

sender, err := qdb.NewLineSender(ctx, 
    qdb.WithTcp(), 
    qdb.WithProtocolVersion(qdb.ProtocolVersion2))

When using protocol_version=2 (with either TCP(s) or HTTP(s)), the sender will now also serialize float64 (double-precision) columns as binary. You might see a performance uplift if this is a dominant data type in your ingestion workload.

Pooled Line Senders

Warning: Experimental feature designed for use with HTTP senders ONLY

Version 3 of the client introduces a LineSenderPool, which provides a mechanism to pool previously-used LineSenders so they can be reused without having to allocate and instantiate new senders.

A LineSenderPool is thread-safe and can be used to concurrently obtain senders across multiple goroutines.

Since LineSenders must be used in a single-threaded context, a typical pattern is to Acquire a sender from a LineSenderPool at the beginning of a goroutine and use a deferred execution block to Close the sender at the end of the goroutine.

Here is an example of the LineSenderPool Acquire, Release, and Close semantics:

package main

import (
	"context"

	qdb "github.com/questdb/go-questdb-client/v4"
)

func main() {
	ctx := context.TODO()

	pool := qdb.PoolFromConf("http::addr=localhost:9000")
	defer func() {
		err := pool.Close(ctx)
		if err != nil {
			panic(err)
		}
	}()

	sender, err := pool.Sender(ctx)
	if err != nil {
		panic(err)
	}

	sender.Table("prices").
		Symbol("ticker", "AAPL").
		Float64Column("price", 123.45).
		AtNow(ctx)

	// Close call returns the sender back to the pool
	if err := sender.Close(ctx); err != nil {
		panic(err)
	}
}

Community

If you need help, have additional questions or want to provide feedback, you may find in our Community Forum. You can also sign up to our mailing list to get notified of new releases.

Documentation

Index

Constants

View Source
const (
	ProtocolVersion1 protocolVersion = 1
	ProtocolVersion2 protocolVersion = 2
)
View Source
const MaxArrayElements = (1 << 28) - 1

MaxArrayElements defines the maximum total number of elements of Array

View Source
const (
	// MaxDimensions defines the maximum dims of NdArray
	MaxDimensions = 32
)

Variables

This section is empty.

Functions

This section is empty.

Types

type HttpError

type HttpError struct {
	Code    string `json:"code"`
	Message string `json:"message"`
	Line    int    `json:"line,omitempty"`
	ErrorId string `json:"errorId"`
	// contains filtered or unexported fields
}

HttpError is a server-sent error message.

func (*HttpError) Error

func (e *HttpError) Error() string

Error returns full error message string.

func (*HttpError) HttpStatus

func (e *HttpError) HttpStatus() int

HttpStatus returns error HTTP status code.

type InvalidConfigStrError

type InvalidConfigStrError struct {
	// contains filtered or unexported fields
}

InvalidConfigStrError is error indicating invalid config string.

func NewInvalidConfigStrError

func NewInvalidConfigStrError(msg string, args ...interface{}) *InvalidConfigStrError

NewInvalidConfigStrError creates new InvalidConfigStrError.

func (*InvalidConfigStrError) Error

func (e *InvalidConfigStrError) Error() string

Error returns full error message string.

type LineSender

type LineSender interface {
	// Table sets the table name (metric) for a new ILP message. Should be
	// called before any Symbol or Column method.
	//
	// Table name cannot contain any of the following characters:
	// '\n', '\r', '?', ',', ”', '"', '\', '/', ':', ')', '(', '+', '*',
	// '%', '~', starting '.', trailing '.', or a non-printable char.
	Table(name string) LineSender

	// Symbol adds a symbol column value to the ILP message. Should be called
	// before any Column method.
	//
	// Symbol name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Symbol(name, val string) LineSender

	// Int64Column adds a 64-bit integer (long) column value to the ILP
	// message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Int64Column(name string, val int64) LineSender

	// Long256Column adds a 256-bit unsigned integer (long256) column
	// value to the ILP message.
	//
	// Only non-negative numbers that fit into 256-bit unsigned integer are
	// supported and any other input value would lead to an error.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Long256Column(name string, val *big.Int) LineSender

	// TimestampColumn adds a timestamp column value to the ILP
	// message.
	//
	// Should be used only for non-designated timestamp column.
	// Designated timestamp column values should be passed to At/AtNow.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	TimestampColumn(name string, ts time.Time) LineSender

	// Float64Column adds a 64-bit float (double) column value to the ILP
	// message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Float64Column(name string, val float64) LineSender

	// StringColumn adds a string column value to the ILP message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	StringColumn(name, val string) LineSender

	// BoolColumn adds a boolean column value to the ILP message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	BoolColumn(name string, val bool) LineSender

	// Float64Array1DColumn adds an array of 64-bit floats (double array) to the ILP message.
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', "', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Float64Array1DColumn(name string, values []float64) LineSender

	// Float64Array2DColumn adds a 2D array of 64-bit floats (double 2D array) to the ILP message.
	//
	// The values parameter must have a regular (rectangular) shape - all rows must have
	// exactly the same length. If the array has irregular shape, this method returns an error.
	//
	// Example of valid input:
	//   values := [][]float64{{1.0, 2.0}, {3.0, 4.0}, {5.0, 6.0}}  // 3x2 regular shape
	//
	// Example of invalid input:
	//   values := [][]float64{{1.0, 2.0}, {3.0}, {4.0, 5.0, 6.0}}   // irregular shape - returns error
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', "', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Float64Array2DColumn(name string, values [][]float64) LineSender

	// Float64Array3DColumn adds a 3D array of 64-bit floats (double 3D array) to the ILP message.
	//
	// The values parameter must have a regular (cuboid) shape - all dimensions must have
	// consistent sizes throughout. If the array has irregular shape, this method returns an error.
	//
	// Example of valid input:
	//   values := [][][]float64{
	//     {{1.0, 2.0}, {3.0, 4.0}},    // 2x2 matrix
	//     {{5.0, 6.0}, {7.0, 8.0}},    // 2x2 matrix (same shape)
	//   }  // 2x2x2 regular shape
	//
	// Example of invalid input:
	//   values := [][][]float64{
	//     {{1.0, 2.0}, {3.0, 4.0}},    // 2x2 matrix
	//     {{5.0}, {6.0, 7.0, 8.0}},    // irregular matrix - returns error
	//   }
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', "', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Float64Array3DColumn(name string, values [][][]float64) LineSender

	// Float64ArrayNDColumn adds an n-dimensional array of 64-bit floats (double n-D array) to the ILP message.
	//
	// Example usage:
	//   // Create a 2x3x4 array
	//   arr, _ := questdb.NewNDArray[float64](2, 3, 4)
	//   arr.Fill(1.5)
	//   sender.Float64ArrayNDColumn("ndarray_col", arr)
	//
	// Column name cannot contain any of the following characters:
	// '\n', '\r', '?', '.', ',', "', '"', '\', '/', ':', ')', '(', '+',
	// '-', '*' '%%', '~', or a non-printable char.
	Float64ArrayNDColumn(name string, values *NdArray[float64]) LineSender

	// At sets the designated timestamp value and finalizes the ILP
	// message.
	//
	// If the underlying buffer reaches configured capacity or the
	// number of buffered messages exceeds the auto-flush trigger, this
	// method also sends the accumulated messages.
	//
	// If ts.IsZero(), no timestamp is sent to the server.
	At(ctx context.Context, ts time.Time) error

	// AtNow omits designated timestamp value and finalizes the ILP
	// message. The server will insert each message using the system
	// clock as the row timestamp.
	//
	// If the underlying buffer reaches configured capacity or the
	// number of buffered messages exceeds the auto-flush trigger, this
	// method also sends the accumulated messages.
	AtNow(ctx context.Context) error

	// Flush sends the accumulated messages via the underlying
	// connection. Should be called periodically to make sure that
	// all messages are sent to the server.
	//
	// For optimal performance, this method should not be called after
	// each ILP message. Instead, the messages should be written in
	// batches followed by a Flush call. The optimal batch size may
	// vary from one thousand to few thousand messages depending on
	// the message size.
	Flush(ctx context.Context) error

	// Close closes the underlying HTTP client.
	//
	// If auto-flush is enabled, the client will flush any remaining buffered
	// messages before closing itself.
	Close(ctx context.Context) error
}

LineSender allows you to insert rows into QuestDB by sending ILP messages over HTTP or TCP protocol.

Each sender corresponds to a single client-server connection. A sender should not be called concurrently by multiple goroutines.

HTTP senders reuse connections from a global pool by default. You can customize the HTTP transport by passing a custom http.Transport to the WithHttpTransport option.

func LineSenderFromConf

func LineSenderFromConf(ctx context.Context, conf string) (LineSender, error)

LineSenderFromConf creates a LineSender using the QuestDB config string format.

Example config string: "http::addr=localhost;username=admin;password=quest;auto_flush_rows=1000;"

QuestDB ILP clients use a common key-value configuration string format across all implementations. We opted for this config over a URL because it reduces the amount of character escaping required for paths and base64-encoded param values.

The config string format is as follows:

schema::key1=value1;key2=value2;key3=value3;

Schemas supported are "http", "https", "tcp", "tcps"

Options: http(s) and tcp(s):

-------------------

addr: hostname/port of QuestDB endpoint

init_buf_size: initial growable ILP buffer size in bytes (defaults to 128KiB)

tls_verify: determines if TLS certificates should be validated (defaults to "on", can be set to "unsafe_off")

http(s)-only

------------

username: for basic authentication

password: for basic authentication

token: bearer token auth (used instead of basic authentication)

auto_flush: determines if auto-flushing is enabled (values "on" or "off", defaults to "on")

auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on. Set to 'off' to disable.

auto_flush_interval: auto-flushing is triggered above this time, in milliseconds (defaults to 1000 milliseconds). If set, explicitly implies auto_flush=on. Set to 'off' to disable.

request_min_throughput: bytes per second, used to calculate each request's timeout (defaults to 100KiB/s)

request_timeout: minimum request timeout in milliseconds (defaults to 10 seconds)

retry_timeout: cumulative maximum millisecond duration spent in retries (defaults to 10 seconds)

max_buf_size: buffer growth limit in bytes. Client errors if breached (default is 100MiB)

tcp(s)-only

-----------

username: KID (key ID) for ECDSA authentication

token: Secret K (D) for ECDSA authentication

func LineSenderFromEnv

func LineSenderFromEnv(ctx context.Context) (LineSender, error)

LineSenderFromEnv creates a LineSender with a config string defined by the QDB_CLIENT_CONF environment variable. See LineSenderFromConf for the config string format.

This is a convenience method suitable for Cloud environments.

func NewLineSender

func NewLineSender(ctx context.Context, opts ...LineSenderOption) (LineSender, error)

NewLineSender creates new InfluxDB Line Protocol (ILP) sender. Each sender corresponds to a single client connection. LineSender should not be called concurrently by multiple goroutines.

type LineSenderOption

type LineSenderOption func(*lineSenderConfig)

LineSenderOption defines line sender config option.

func WithAddress

func WithAddress(addr string) LineSenderOption

WithAddress sets address to connect to. Should be in the "host:port" format. Defaults to "127.0.0.1:9000" in case of HTTP and "127.0.0.1:9009" in case of TCP.

func WithAuth

func WithAuth(tokenId, token string) LineSenderOption

WithAuth sets token (private key) used for ILP authentication.

Only available for the TCP sender.

func WithAutoFlushDisabled

func WithAutoFlushDisabled() LineSenderOption

WithAutoFlushDisabled turns off auto-flushing behavior. To send ILP messages, the user must call Flush().

Only available for the HTTP sender.

func WithAutoFlushInterval

func WithAutoFlushInterval(interval time.Duration) LineSenderOption

WithAutoFlushInterval the interval at which the Sender automatically flushes its buffer. Defaults to 1 second.

Only available for the HTTP sender.

func WithAutoFlushRows

func WithAutoFlushRows(rows int) LineSenderOption

WithAutoFlushRows sets the number of buffered rows that must be breached in order to trigger an auto-flush. Defaults to 75000.

Only available for the HTTP sender.

func WithBasicAuth

func WithBasicAuth(user, pass string) LineSenderOption

WithBasicAuth sets a Basic authentication header for ILP requests over HTTP.

Only available for the HTTP sender.

func WithBearerToken

func WithBearerToken(token string) LineSenderOption

WithBearerToken sets a Bearer token Authentication header for ILP requests.

Only available for the HTTP sender.

func WithFileNameLimit

func WithFileNameLimit(limit int) LineSenderOption

WithFileNameLimit sets maximum file name length in chars allowed by the server. Affects maximum table and column name lengths accepted by the sender. Should be set to the same value as on the server. Defaults to 127.

func WithHttp

func WithHttp() LineSenderOption

WithHttp enables ingestion over HTTP protocol.

func WithHttpTransport

func WithHttpTransport(t *http.Transport) LineSenderOption

WithHttpTransport sets the client's http transport to the passed pointer instead of the global transport. This can be used for customizing the http transport used by the LineSender. For example to set custom timeouts, TLS settings, etc. WithTlsInsecureSkipVerify is ignored when this option is in use.

Only available for the HTTP sender.

func WithInitBufferSize

func WithInitBufferSize(sizeInBytes int) LineSenderOption

WithInitBufferSize sets the desired initial buffer capacity in bytes to be used when sending ILP messages. Defaults to 128KB.

This setting is a soft limit, i.e. the underlying buffer may grow larger than the provided value.

func WithMaxBufferSize

func WithMaxBufferSize(sizeInBytes int) LineSenderOption

WithMaxBufferSize sets the maximum buffer capacity in bytes to be used when sending ILP messages. The sender will return an error if the limit is reached. Defaults to 100MB.

Only available for the HTTP sender.

func WithMinThroughput

func WithMinThroughput(bytesPerSecond int) LineSenderOption

WithMinThroughput is used in combination with request_timeout to set the timeout of an ILP request. Defaults to 100KiB/s.

timeout = (request.len() / request_min_throughput) + request_timeout

Only available for the HTTP sender.

func WithProtocolVersion

func WithProtocolVersion(version protocolVersion) LineSenderOption

WithProtocolVersion sets the ingestion protocol version.

  • HTTP transport automatically negotiates the protocol version by default(unset, STRONGLY RECOMMENDED). You can explicitly configure the protocol version to avoid the slight latency cost at connection time.
  • TCP transport does not negotiate the protocol version and uses ProtocolVersion1 by default. You must explicitly set ProtocolVersion2 in order to ingest arrays.

NOTE: QuestDB server version 9.0.0 or later is required for ProtocolVersion2.

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) LineSenderOption

WithRequestTimeout is used in combination with request_min_throughput to set the timeout of an ILP request. Defaults to 10 seconds.

timeout = (request.len() / request_min_throughput) + request_timeout

Only available for the HTTP sender.

func WithRetryTimeout

func WithRetryTimeout(t time.Duration) LineSenderOption

WithRetryTimeout is the cumulative maximum duration spend in retries. Defaults to 10 seconds. Retries work great when used in combination with server-side data deduplication.

Only network-related errors and certain 5xx response codes are retryable.

Only available for the HTTP sender.

func WithTcp

func WithTcp() LineSenderOption

WithTcp enables ingestion over TCP protocol.

func WithTls

func WithTls() LineSenderOption

WithTls enables TLS connection encryption.

func WithTlsInsecureSkipVerify

func WithTlsInsecureSkipVerify() LineSenderOption

WithTlsInsecureSkipVerify enables TLS connection encryption, but skips server certificate verification. Useful in test environments with self-signed certificates. Do not use in production environments.

type LineSenderPool

type LineSenderPool struct {
	// contains filtered or unexported fields
}

LineSenderPool wraps a mutex-protected slice of LineSender. It allows a goroutine to Acquire a sender from the pool and Release it back to the pool when it's done being used.

WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.

func PoolFromConf

func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error)

PoolFromConf instantiates a new LineSenderPool with a QuestDB configuration string. Any sender acquired from this pool will be initialized with the same configuration string that was passed into the conf argument.

The default maximum number of senders is 64, but can be customized by using the WithMaxSenders option.

func PoolFromOptions

func PoolFromOptions(opts ...LineSenderOption) (*LineSenderPool, error)

PoolFromOptions instantiates a new LineSenderPool using programmatic options. Any sender acquired from this pool will be initialized with the same options that were passed into the opts argument.

Unlike PoolFromConf, PoolFromOptions does not have the ability to customize the returned LineSenderPool. In this case, to add options (such as WithMaxSenders), you need manually apply these options after calling this method.

// Create a PoolFromOptions with LineSender options
p, err := PoolFromOptions(
	WithHttp(),
	WithAutoFlushRows(1000000),
)

if err != nil {
	panic(err)
}

// Add Pool-level options manually
WithMaxSenders(32)(p)

func (*LineSenderPool) Close

func (p *LineSenderPool) Close(ctx context.Context) error

Close sets the pool's status to "closed" and closes all cached LineSenders. When LineSenders are released back into a closed pool, they will be closed and discarded.

func (*LineSenderPool) IsClosed

func (p *LineSenderPool) IsClosed() bool

IsClosed will return true if the pool is closed. Once a pool is closed, you will not be able to Acquire any new LineSenders from it. When LineSenders are released back into a closed pool, they will be closed and discarded.

func (*LineSenderPool) Len

func (p *LineSenderPool) Len() int

Len returns the number of LineSenders in the pool.

func (*LineSenderPool) Sender

func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error)

Sender obtains a LineSender from the pool. If the pool is empty, a new LineSender will be instantiated using the pool's config string. If there is already maximum number of senders obtained from the pool, this call will block until one of the senders is returned back to the pool by calling sender.Close().

type LineSenderPoolOption

type LineSenderPoolOption func(*LineSenderPool)

LineSenderPoolOption defines line sender pool config option.

func WithMaxSenders

func WithMaxSenders(count int) LineSenderPoolOption

WithMaxSenders sets the maximum number of senders in the pool. The default maximum number of senders is 64.

type NdArray

type NdArray[T NdArrayElementType] struct {
	// contains filtered or unexported fields
}

NdArray represents a generic n-dimensional array with shape validation. It's designed to be used with the [LineSender.Float64ArrayNDColumn] method for sending multi-dimensional arrays to QuestDB via the ILP protocol.

NdArray instances are meant to be reused across multiple calls to the sender to avoid memory allocations. Use Append to populate data and ResetAppendIndex to reset the array for reuse after sending data.

By default, all values in the array are initialized to zero.

func NewNDArray

func NewNDArray[T NdArrayElementType](shape ...uint) (*NdArray[T], error)

NewNDArray creates a new NdArray with the specified shape. All elements are initialized to zero by default.

func (*NdArray[T]) Append

func (n *NdArray[T]) Append(val T) (bool, error)

Append adds a value to the array sequentially at the current append index. Returns true if there's more space for additional values, false if the array is now full. Use ResetAppendIndex() to reuse the array for multiple ILP messages.

Example:

arr, _ := NewNDArray[float64](2, 3) // 2x3 array (6 elements total)
hasMore, _ := arr.Append(1.0)       // hasMore = true, index now at 1
hasMore, _ = arr.Append(2.0)        // hasMore = true, index now at 2
// ... append 4 more values
hasMore, _ = arr.Append(6.0)        // hasMore = false, array is full

// To reuse the array:
arr.ResetAppendIndex()
arr.Append(10.0)                    // overwrites
...

func (*NdArray[T]) Data

func (n *NdArray[T]) Data() []T

Data returns the underlying data slice

func (*NdArray[T]) Fill

func (n *NdArray[T]) Fill(value T)

Fill fills the entire array with the specified value

func (*NdArray[T]) Get

func (n *NdArray[T]) Get(positions ...uint) (T, error)

Get retrieves a value at the specified multi-dimensional position

func (*NdArray[T]) NDims

func (n *NdArray[T]) NDims() int

NDims returns the number of dimensions

func (*NdArray[T]) ResetAppendIndex

func (n *NdArray[T]) ResetAppendIndex()

ResetAppendIndex resets the append index to 0, allowing the NdArray to be reused for multiple append operations. This is useful for reusing arrays across multiple messages/rows ingestion without reallocating memory.

Example:

arr, _ := NewNDArray[float64](2) // 1D array with 3 elements
arr.Append(2.0)
arr.Append(3.0) // array is now full

// sender.Float64ArrayNDColumn(arr)

arr.ResetAppendIndex() // reset for reuse
arr.Append(4.0)
arr.Append(5.0)

func (*NdArray[T]) Reshape

func (n *NdArray[T]) Reshape(newShape ...uint) (*NdArray[T], error)

Reshape creates a new NdArray with a different shape but same data

func (*NdArray[T]) Set

func (n *NdArray[T]) Set(v T, positions ...uint) error

Set sets a value at the specified multi-dimensional position

func (*NdArray[T]) Shape

func (n *NdArray[T]) Shape() []uint

Shape returns a copy of the array's shape

func (*NdArray[T]) Size

func (n *NdArray[T]) Size() int

Size returns the total number of elements

type NdArrayElementType

type NdArrayElementType interface {
	~float64
}

NdArrayElementType represents the constraint for numeric types that can be used in NdArray

type RetryTimeoutError

type RetryTimeoutError struct {
	LastErr error
	Timeout time.Duration
}

RetryTimeoutError is error indicating failed flush retry attempt.

func NewRetryTimeoutError

func NewRetryTimeoutError(timeout time.Duration, lastError error) *RetryTimeoutError

NewRetryTimeoutError returns a new RetryTimeoutError error.

func (*RetryTimeoutError) Error

func (e *RetryTimeoutError) Error() string

Error returns full error message string.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL