Documentation ¶
Index ¶
- type HttpError
- type InvalidConfigStrError
- type LineSender
- type LineSenderOption
- func WithAddress(addr string) LineSenderOption
- func WithAuth(tokenId, token string) LineSenderOption
- func WithAutoFlushDisabled() LineSenderOption
- func WithAutoFlushInterval(interval time.Duration) LineSenderOption
- func WithAutoFlushRows(rows int) LineSenderOption
- func WithBasicAuth(user, pass string) LineSenderOption
- func WithBearerToken(token string) LineSenderOption
- func WithFileNameLimit(limit int) LineSenderOption
- func WithHttp() LineSenderOption
- func WithHttpTransport(t *http.Transport) LineSenderOption
- func WithInitBufferSize(sizeInBytes int) LineSenderOption
- func WithMaxBufferSize(sizeInBytes int) LineSenderOption
- func WithMinThroughput(bytesPerSecond int) LineSenderOption
- func WithRequestTimeout(timeout time.Duration) LineSenderOption
- func WithRetryTimeout(t time.Duration) LineSenderOption
- func WithTcp() LineSenderOption
- func WithTls() LineSenderOption
- func WithTlsInsecureSkipVerify() LineSenderOption
- type LineSenderPool
- type LineSenderPoolOption
- type RetryTimeoutError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type HttpError ¶
type HttpError struct { Code string `json:"code"` Message string `json:"message"` Line int `json:"line,omitempty"` ErrorId string `json:"errorId"` // contains filtered or unexported fields }
HttpError is a server-sent error message.
func (*HttpError) HttpStatus ¶
HttpStatus returns error HTTP status code.
type InvalidConfigStrError ¶
type InvalidConfigStrError struct {
// contains filtered or unexported fields
}
InvalidConfigStrError is error indicating invalid config string.
func NewInvalidConfigStrError ¶
func NewInvalidConfigStrError(msg string, args ...interface{}) *InvalidConfigStrError
NewInvalidConfigStrError creates new InvalidConfigStrError.
func (*InvalidConfigStrError) Error ¶
func (e *InvalidConfigStrError) Error() string
Error returns full error message string.
type LineSender ¶
type LineSender interface { // Table sets the table name (metric) for a new ILP message. Should be // called before any Symbol or Column method. // // Table name cannot contain any of the following characters: // '\n', '\r', '?', ',', ”', '"', '\', '/', ':', ')', '(', '+', '*', // '%', '~', starting '.', trailing '.', or a non-printable char. Table(name string) LineSender // Symbol adds a symbol column value to the ILP message. Should be called // before any Column method. // // Symbol name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. Symbol(name, val string) LineSender // Int64Column adds a 64-bit integer (long) column value to the ILP // message. // // Column name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. Int64Column(name string, val int64) LineSender // Long256Column adds a 256-bit unsigned integer (long256) column // value to the ILP message. // // Only non-negative numbers that fit into 256-bit unsigned integer are // supported and any other input value would lead to an error. // // Column name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. Long256Column(name string, val *big.Int) LineSender // TimestampColumn adds a timestamp column value to the ILP // message. // // Should be used only for non-designated timestamp column. // Designated timestamp column values should be passed to At/AtNow. // // Column name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. TimestampColumn(name string, ts time.Time) LineSender // Float64Column adds a 64-bit float (double) column value to the ILP // message. // // Column name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. Float64Column(name string, val float64) LineSender // StringColumn adds a string column value to the ILP message. // // Column name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. StringColumn(name, val string) LineSender // BoolColumn adds a boolean column value to the ILP message. // // Column name cannot contain any of the following characters: // '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+', // '-', '*' '%%', '~', or a non-printable char. BoolColumn(name string, val bool) LineSender // At sets the designated timestamp value and finalizes the ILP // message. // // If the underlying buffer reaches configured capacity or the // number of buffered messages exceeds the auto-flush trigger, this // method also sends the accumulated messages. // // If ts.IsZero(), no timestamp is sent to the server. At(ctx context.Context, ts time.Time) error // AtNow omits designated timestamp value and finalizes the ILP // message. The server will insert each message using the system // clock as the row timestamp. // // If the underlying buffer reaches configured capacity or the // number of buffered messages exceeds the auto-flush trigger, this // method also sends the accumulated messages. AtNow(ctx context.Context) error // Flush sends the accumulated messages via the underlying // connection. Should be called periodically to make sure that // all messages are sent to the server. // // For optimal performance, this method should not be called after // each ILP message. Instead, the messages should be written in // batches followed by a Flush call. The optimal batch size may // vary from one thousand to few thousand messages depending on // the message size. Flush(ctx context.Context) error // Close closes the underlying HTTP client. // // If auto-flush is enabled, the client will flush any remaining buffered // messages before closing itself. Close(ctx context.Context) error }
LineSender allows you to insert rows into QuestDB by sending ILP messages over HTTP or TCP protocol.
Each sender corresponds to a single client-server connection. A sender should not be called concurrently by multiple goroutines.
HTTP senders reuse connections from a global pool by default. You can customize the HTTP transport by passing a custom http.Transport to the WithHttpTransport option.
func LineSenderFromConf ¶
func LineSenderFromConf(ctx context.Context, conf string) (LineSender, error)
LineSenderFromConf creates a LineSender using the QuestDB config string format.
Example config string: "http::addr=localhost;username=joe;password=123;auto_flush_rows=1000;"
QuestDB ILP clients use a common key-value configuration string format across all implementations. We opted for this config over a URL because it reduces the amount of character escaping required for paths and base64-encoded param values.
The config string format is as follows:
schema::key1=value1;key2=value2;key3=value3;
Schemas supported are "http", "https", "tcp", "tcps"
Options: http(s) and tcp(s):
-------------------
addr: hostname/port of QuestDB endpoint
init_buf_size: initial growable ILP buffer size in bytes (defaults to 128KiB)
tls_verify: determines if TLS certificates should be validated (defaults to "on", can be set to "unsafe_off")
http(s)-only
------------
username: for basic authentication
password: for basic authentication
token: bearer token auth (used instead of basic authentication)
auto_flush: determines if auto-flushing is enabled (values "on" or "off", defaults to "on")
auto_flush_rows: auto-flushing is triggered above this row count (defaults to 75000). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
auto_flush_interval: auto-flushing is triggered above this time, in milliseconds (defaults to 1000 milliseconds). If set, explicitly implies auto_flush=on. Set to 'off' to disable.
request_min_throughput: bytes per second, used to calculate each request's timeout (defaults to 100KiB/s)
request_timeout: minimum request timeout in milliseconds (defaults to 10 seconds)
retry_timeout: cumulative maximum millisecond duration spent in retries (defaults to 10 seconds)
max_buf_size: buffer growth limit in bytes. Client errors if breached (default is 100MiB)
tcp(s)-only
-----------
username: KID (key ID) for ECDSA authentication
token: Secret K (D) for ECDSA authentication
func LineSenderFromEnv ¶ added in v3.0.4
func LineSenderFromEnv(ctx context.Context) (LineSender, error)
LineSenderFromEnv creates a LineSender with a config string defined by the QDB_CLIENT_CONF environment variable. See LineSenderFromConf for the config string format.
This is a convenience method suitable for Cloud environments.
func NewLineSender ¶
func NewLineSender(ctx context.Context, opts ...LineSenderOption) (LineSender, error)
NewLineSender creates new InfluxDB Line Protocol (ILP) sender. Each sender corresponds to a single client connection. LineSender should not be called concurrently by multiple goroutines.
type LineSenderOption ¶
type LineSenderOption func(*lineSenderConfig)
LineSenderOption defines line sender config option.
func WithAddress ¶
func WithAddress(addr string) LineSenderOption
WithAddress sets address to connect to. Should be in the "host:port" format. Defaults to "127.0.0.1:9000" in case of HTTP and "127.0.0.1:9009" in case of TCP.
func WithAuth ¶
func WithAuth(tokenId, token string) LineSenderOption
WithAuth sets token (private key) used for ILP authentication.
Only available for the TCP sender.
func WithAutoFlushDisabled ¶
func WithAutoFlushDisabled() LineSenderOption
WithAutoFlushDisabled turns off auto-flushing behavior. To send ILP messages, the user must call Flush().
Only available for the HTTP sender.
func WithAutoFlushInterval ¶
func WithAutoFlushInterval(interval time.Duration) LineSenderOption
WithAutoFlushInterval the interval at which the Sender automatically flushes its buffer. Defaults to 1 second.
Only available for the HTTP sender.
func WithAutoFlushRows ¶
func WithAutoFlushRows(rows int) LineSenderOption
WithAutoFlushRows sets the number of buffered rows that must be breached in order to trigger an auto-flush. Defaults to 75000.
Only available for the HTTP sender.
func WithBasicAuth ¶
func WithBasicAuth(user, pass string) LineSenderOption
WithBasicAuth sets a Basic authentication header for ILP requests over HTTP.
Only available for the HTTP sender.
func WithBearerToken ¶
func WithBearerToken(token string) LineSenderOption
WithBearerToken sets a Bearer token Authentication header for ILP requests.
Only available for the HTTP sender.
func WithFileNameLimit ¶
func WithFileNameLimit(limit int) LineSenderOption
WithFileNameLimit sets maximum file name length in chars allowed by the server. Affects maximum table and column name lengths accepted by the sender. Should be set to the same value as on the server. Defaults to 127.
func WithHttpTransport ¶
func WithHttpTransport(t *http.Transport) LineSenderOption
WithHttpTransport sets the client's http transport to the passed pointer instead of the global transport. This can be used for customizing the http transport used by the LineSender. For example to set custom timeouts, TLS settings, etc. WithTlsInsecureSkipVerify is ignored when this option is in use.
Only available for the HTTP sender.
func WithInitBufferSize ¶
func WithInitBufferSize(sizeInBytes int) LineSenderOption
WithInitBufferSize sets the desired initial buffer capacity in bytes to be used when sending ILP messages. Defaults to 128KB.
This setting is a soft limit, i.e. the underlying buffer may grow larger than the provided value.
func WithMaxBufferSize ¶
func WithMaxBufferSize(sizeInBytes int) LineSenderOption
WithMaxBufferSize sets the maximum buffer capacity in bytes to be used when sending ILP messages. The sender will return an error if the limit is reached. Defaults to 100MB.
Only available for the HTTP sender.
func WithMinThroughput ¶
func WithMinThroughput(bytesPerSecond int) LineSenderOption
WithMinThroughput is used in combination with request_timeout to set the timeout of an ILP request. Defaults to 100KiB/s.
timeout = (request.len() / request_min_throughput) + request_timeout
Only available for the HTTP sender.
func WithRequestTimeout ¶
func WithRequestTimeout(timeout time.Duration) LineSenderOption
WithRequestTimeout is used in combination with request_min_throughput to set the timeout of an ILP request. Defaults to 10 seconds.
timeout = (request.len() / request_min_throughput) + request_timeout
Only available for the HTTP sender.
func WithRetryTimeout ¶
func WithRetryTimeout(t time.Duration) LineSenderOption
WithRetryTimeout is the cumulative maximum duration spend in retries. Defaults to 10 seconds. Retries work great when used in combination with server-side data deduplication.
Only network-related errors and certain 5xx response codes are retryable.
Only available for the HTTP sender.
func WithTlsInsecureSkipVerify ¶
func WithTlsInsecureSkipVerify() LineSenderOption
WithTlsInsecureSkipVerify enables TLS connection encryption, but skips server certificate verification. Useful in test environments with self-signed certificates. Do not use in production environments.
type LineSenderPool ¶ added in v3.1.0
type LineSenderPool struct {
// contains filtered or unexported fields
}
LineSenderPool wraps a mutex-protected slice of LineSender. It allows a goroutine to Acquire a sender from the pool and Release it back to the pool when it's done being used.
WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.
func PoolFromConf ¶ added in v3.1.0
func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error)
PoolFromConf instantiates a new LineSenderPool with a QuestDB configuration string. Any sender acquired from this pool will be initialized with the same configuration string that was passed into the conf argument.
The default maximum number of senders is 64, but can be customized by using the WithMaxSenders option.
func PoolFromOptions ¶ added in v3.2.0
func PoolFromOptions(opts ...LineSenderOption) (*LineSenderPool, error)
PoolFromOptions instantiates a new LineSenderPool using programmatic options. Any sender acquired from this pool will be initialized with the same options that were passed into the opts argument.
Unlike PoolFromConf, PoolFromOptions does not have the ability to customize the returned LineSenderPool. In this case, to add options (such as WithMaxSenders), you need manually apply these options after calling this method.
// Create a PoolFromOptions with LineSender options p, err := PoolFromOptions( WithHttp(), WithAutoFlushRows(1000000), ) if err != nil { panic(err) } // Add Pool-level options manually WithMaxSenders(32)(p)
func (*LineSenderPool) Close ¶ added in v3.1.0
func (p *LineSenderPool) Close(ctx context.Context) error
Close sets the pool's status to "closed" and closes all cached LineSenders. When LineSenders are released back into a closed pool, they will be closed and discarded.
func (*LineSenderPool) IsClosed ¶ added in v3.1.0
func (p *LineSenderPool) IsClosed() bool
IsClosed will return true if the pool is closed. Once a pool is closed, you will not be able to Acquire any new LineSenders from it. When LineSenders are released back into a closed pool, they will be closed and discarded.
func (*LineSenderPool) Len ¶ added in v3.1.0
func (p *LineSenderPool) Len() int
Len returns the number of LineSenders in the pool.
func (*LineSenderPool) Sender ¶ added in v3.2.0
func (p *LineSenderPool) Sender(ctx context.Context) (LineSender, error)
Sender obtains a LineSender from the pool. If the pool is empty, a new LineSender will be instantiated using the pool's config string. If there is already maximum number of senders obtained from the pool, this call will block until one of the senders is returned back to the pool by calling sender.Close().
type LineSenderPoolOption ¶ added in v3.1.0
type LineSenderPoolOption func(*LineSenderPool)
LineSenderPoolOption defines line sender pool config option.
func WithMaxSenders ¶ added in v3.1.0
func WithMaxSenders(count int) LineSenderPoolOption
WithMaxSenders sets the maximum number of senders in the pool. The default maximum number of senders is 64.
type RetryTimeoutError ¶
RetryTimeoutError is error indicating failed flush retry attempt.
func NewRetryTimeoutError ¶
func NewRetryTimeoutError(timeout time.Duration, lastError error) *RetryTimeoutError
NewRetryTimeoutError returns a new RetryTimeoutError error.
func (*RetryTimeoutError) Error ¶
func (e *RetryTimeoutError) Error() string
Error returns full error message string.