influxdb2

package module
v0.0.0-...-4a99b6d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2020 License: MIT Imports: 28 Imported by: 0

README

InfluxDB Client Go

CircleCI codecov License

This repository contains the reference Go client for InfluxDB 2.

Features

Installation

Go 1.3 or later is required.

Add import github.com/bonitoo-io/influxdb-client-go to your source code and sync dependencies or directly edit go.mod.

Usage

Basic example with blocking write and flux query.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bonitoo-io/influxdb-client-go"
)

func main() {
    // create new client with default option for server url authenticate by token
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // use blocking write client for writes to desired bucket
    writeApi := client.WriteApiBlocking("my-org", "my-bucket")
    // create point using full params constructor 
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // write point immediately 
    writeApi.WritePoint(context.Background(), p)
    // create point using fluent style
    p = influxdb2.NewPointWithMeasurement("stat").
        AddTag("unit", "temperature").
        AddField("avg", 23.2).
        AddField("max", 45).
        SetTime(time.Now())
    // write immediately
    writeApi.WritePoint(context.Background(), p)
    
    // or write directly line protocol
    line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
    writeApi.WriteRecord(context.Background(), line)

    // get query client
    queryApi := client.QueryApi("my-org")
    // get parser flux query result
    result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
    	 // Use Next() to iterate over query result lines
	 for result.Next() {
            // Observe when there is new grouping key producing new table
	    if result.TableChanged() {
	        fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
	    fmt.Printf("row: %s\n", result.Record().String())
	}
	if result.Err() != nil {
	    fmt.Printf("Query error: %s\n", result.Err().Error())
	}
    }
}
Options

Client uses set of options to configure behavior. These are available in the Options object Creating client using:

client := influxdb2.NewClient("http://localhost:9999", "my-token")

To set different configuration values, e.g. to set gzip compression and trust all server certificates, get default options and change what needed:

    client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token", 
		influxdb2.DefaultOptions().
			SetUseGZip(true).
			SetTlsConfig(&tls.Config{
				InsecureSkipVerify: true,
			}))
Writes

Client offers two ways of writing, non-blocking and blocking.

Non-blocking write client

Non-blocking write client uses implicit batching. Data are asynchronously written to the underlying buffer and are automatically sent to server when size of the write buffer reaches the batch size, default 1000, or flush interval, default 1s, times out. Writes are automatically retried on server back pressure.

This client also offers synchronous blocking method to ensure that write buffer is flushed and all pending writes are finished, see Flush() method. Always use Close() method of the client to stop background processes.

This write client recommended for frequent periodic writes.

Example:

package main

import (
	"fmt"
	"github.com/bonitoo-io/influxdb-client-go"
	"math/rand"
	"time"
)

func main() {
    // Create client and set batch size to 20 
    client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token",
		influxdb2.DefaultOptions().SetBatchSize(20))
    // Get non-blocking write client
    writeApi := client.WriteApi("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create point
	p := influxdb2.NewPoint(
	    "system",
	    map[string]string{
	    	id":       fmt.Sprintf("rack_%v", i%10),
		vendor":   "AWS",
		hostname": fmt.Sprintf("host_%v", i%100),
	    },
	    map[string]interface{}{
		"temperature": rand.Float64() * 80.0,
		"disk_free":   rand.Float64() * 1000.0,
		"disk_total":  (i/10 + 1) * 1000000,
		"mem_total":   (i/100 + 1) * 10000000,
		"mem_free":    rand.Uint64(),
	    },
	    time.Now())
        // write asynchronously
	writeApi.WritePoint(p)
     }
     // Force all unwritten data to be sent
     writeApi.Flush()
     // Ensures background processes finishes
     client.Close()
}
Blocking write client

Blocking write client writes given point(s) synchronously. There is no implicit batching. A batch is created from given set of points

Example:

package main

import (
	"context"
	"fmt"
	"github.com/bonitoo-io/influxdb-client-go"
	"math/rand"
	"time"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get non-blocking write client
    writeApi := client.WriteApiBlocking("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
	// create data point
	p := influxdb2.NewPoint(
		"system",
		map[string]string{
			"id":       fmt.Sprintf("rack_%v", i%10),
			"vendor":   "AWS",
			"hostname": fmt.Sprintf("host_%v", i%100),
		},
		map[string]interface{}{
			"temperature": rand.Float64() * 80.0,
			"disk_free":   rand.Float64() * 1000.0,
			"disk_total":  (i/10 + 1) * 1000000,
			"mem_total":   (i/100 + 1) * 10000000,
			"mem_free":    rand.Uint64(),
		},
		time.Now())
	// write synchronously
	err := writeApi.WritePoint(context.Background(), p)
	if err != nil {
	    panic(err)
	}
    }
}
Queries

Query client offer two ways of retrieving query results, parsed representation in QueryTableResult and a raw result string. which parses response stream into FluxTableMetaData, FluxColumn and FluxRecord objects.

QueryTableResult

QueryTableResult offers comfortable way how to deal with flux query CSV response. It parses CSV stream into FluxTableMetaData, FluxColumn and FluxRecord objects for easy reading the result.

package main

import (
	"context"
	"fmt"
	"github.com/bonitoo-io/influxdb-client-go"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get query client
    queryApi := client.QueryApi("my-org")
    // get QueryTableResult
    result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
	// Iterate over query response
	for result.Next() {
		// Notice when group key has changed
		if result.TableChanged() {
			fmt.Printf("table: %s\n", result.TableMetadata().String())
		}
		// Access data
		fmt.Printf("value: %v\n", result.Record().Value())
	}
	// check for an error
	if result.Err() != nil {
		fmt.Printf("query parsing error: %s\n", result.Err().Error())
	}
    } else {
	panic(err)
    }
}
Raw

QueryRaw() returns a raw, unparsed, query result string and process it on your own. Returned csv format
can controlled by third parameter, query dialect.

package main

import (
	"context"
	"fmt"
	"github.com/bonitoo-io/influxdb-client-go"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get query client
    queryApi := client.QueryApi("my-org")
    // Query and get complete result as a string
    // Use default dialect
    result, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect())
    if err == nil {
    	fmt.Println("QueryResult:")
	fmt.Println(result)
    } else {
	panic(err)
    }
}    

Contributing

If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master branch.

License

The InfluxDB 2 Go Client is released under the MIT License.

Documentation

Overview

package influxdb2 provides API for using InfluxDB client in Go It's intended to use with InfluxDB 2 server

Index

Constants

View Source
const (
	Version = "1.0.0"
)

Variables

This section is empty.

Functions

func DefaultDialect

func DefaultDialect() *domain.Dialect

DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter

Types

type Error

type Error struct {
	StatusCode int
	Code       string
	Message    string
	Err        error
	RetryAfter uint
}

Error represent error response from InfluxDBServer or http error

func NewError

func NewError(err error) *Error

NewError returns newly created Error initialised with nested error and default values

func (*Error) Error

func (e *Error) Error() string

Error fulfils error interface

type FluxColumn

type FluxColumn struct {
	// contains filtered or unexported fields
}

FluxColumn holds flux query table column properties

func (*FluxColumn) DataType

func (f *FluxColumn) DataType() string

DataType returns data type of the column

func (*FluxColumn) DefaultValue

func (f *FluxColumn) DefaultValue() string

DefaultValue returns default value of the column

func (*FluxColumn) Index

func (f *FluxColumn) Index() int

Index returns index of the column

func (*FluxColumn) IsGroup

func (f *FluxColumn) IsGroup() bool

IsGroup return true if the column is grouping column

func (*FluxColumn) Name

func (f *FluxColumn) Name() string

Name returns name of the column

func (*FluxColumn) SetDataType

func (f *FluxColumn) SetDataType(dataType string)

SetDataType sets data type for the column

func (*FluxColumn) SetDefaultValue

func (f *FluxColumn) SetDefaultValue(defaultValue string)

SetDefaultValue sets default value for the column

func (*FluxColumn) SetGroup

func (f *FluxColumn) SetGroup(group bool)

SetGroup set group flag for the column

func (*FluxColumn) SetName

func (f *FluxColumn) SetName(name string)

SetName sets name of the column

func (*FluxColumn) String

func (f *FluxColumn) String() string

String returns FluxColumn string dump

type FluxRecord

type FluxRecord struct {
	// contains filtered or unexported fields
}

FluxRecord represents row in the flux query result table

func (*FluxRecord) Field

func (r *FluxRecord) Field() string

Field returns the field name

func (*FluxRecord) Measurement

func (r *FluxRecord) Measurement() string

Measurement returns the measurement name of the record

func (*FluxRecord) Start

func (r *FluxRecord) Start() time.Time

Start returns the inclusive lower time bound of all records in the current table

func (*FluxRecord) Stop

func (r *FluxRecord) Stop() time.Time

Stop returns the exclusive upper time bound of all records in the current table

func (*FluxRecord) String

func (r *FluxRecord) String() string

String returns FluxRecord string dump

func (*FluxRecord) Table

func (r *FluxRecord) Table() int

Table returns index of the table record belongs to

func (*FluxRecord) Time

func (r *FluxRecord) Time() time.Time

Start returns the time of the record

func (*FluxRecord) Value

func (r *FluxRecord) Value() interface{}

Value returns the actual field value

func (*FluxRecord) ValueByKey

func (r *FluxRecord) ValueByKey(key string) interface{}

ValueByKey returns value for given column key for the record

func (*FluxRecord) Values

func (r *FluxRecord) Values() map[string]interface{}

Values returns map of the values where key is the column name

type FluxTableMetadata

type FluxTableMetadata struct {
	// contains filtered or unexported fields
}

FluxTableMetadata holds flux query result table information represented by collection of columns. Each new table is introduced by annotations

func (*FluxTableMetadata) AddColumn

func (f *FluxTableMetadata) AddColumn(column *FluxColumn) *FluxTableMetadata

AddColumn adds column definition to table metadata

func (*FluxTableMetadata) Column

func (f *FluxTableMetadata) Column(index int) *FluxColumn

Column returns flux table column by index Returns nil if index is out of the bounds

func (*FluxTableMetadata) Columns

func (f *FluxTableMetadata) Columns() []*FluxColumn

Columns returns slice of flux query result table

func (*FluxTableMetadata) Position

func (f *FluxTableMetadata) Position() int

Position returns position of the table in the flux query result

func (*FluxTableMetadata) String

func (f *FluxTableMetadata) String() string

String returns FluxTableMetadata string dump

type InfluxDBClient

type InfluxDBClient interface {
	// WriteApi returns the asynchronous, non-blocking, Write client.
	WriteApi(org, bucket string) WriteApi
	// WriteApi returns the synchronous, blocking, Write client.
	WriteApiBlocking(org, bucket string) WriteApiBlocking
	// QueryApi returns Query client
	QueryApi(org string) QueryApi
	// Close ensures all ongoing asynchronous write clients finish
	Close()
	// Options returns the options associated with client
	Options() *Options
	// ServerUrl returns the url of the server url client talks to
	ServerUrl() string
	// Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period
	// Retention period of zero will result to infinite retention
	// and returns details about newly created entities along with the authorization object
	Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error)
	// Ready checks InfluxDB server is running
	Ready(ctx context.Context) (bool, error)
	// contains filtered or unexported methods
}

InfluxDBClient provides API to communicate with InfluxDBServer There two APIs for writing, WriteApi and WriteApiBlocking. WriteApi provides asynchronous, non-blocking, methods for writing time series data. WriteApiBlocking provides blocking methods for writing time series data

func NewClient

func NewClient(serverUrl string, authToken string) InfluxDBClient

NewClient creates InfluxDBClient for connecting to given serverUrl with provided authentication token, with default options Authentication token can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case Setup will set authentication token

func NewClientWithOptions

func NewClientWithOptions(serverUrl string, authToken string, options *Options) InfluxDBClient

NewClientWithOptions creates InfluxDBClient for connecting to given serverUrl with provided authentication token and configured with custom Options Authentication token can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet. In such case Setup will set authentication token

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options holds configuration properties for communicating with InfluxDB server

func DefaultOptions

func DefaultOptions() *Options

DefaultOptions returns Options object with default values

func (*Options) BatchSize

func (o *Options) BatchSize() uint

BatchSize returns size of batch

func (*Options) FlushInterval

func (o *Options) FlushInterval() uint

FlushInterval returns flush interval in ms

func (*Options) LogLevel

func (o *Options) LogLevel() uint

LogLevel returns log level

func (*Options) MaxRetries

func (o *Options) MaxRetries() uint

MaxRetries returns maximum count of retry attempts of failed writes

func (*Options) Precision

func (o *Options) Precision() time.Duration

Precision returns time precision for writes

func (*Options) RetryBufferLimit

func (o *Options) RetryBufferLimit() uint

RetryBufferLimit returns retry buffer limit

func (*Options) RetryInterval

func (o *Options) RetryInterval() uint

RetryInterval returns retry interval in ms

func (*Options) SetBatchSize

func (o *Options) SetBatchSize(batchSize uint) *Options

SetBatchSize sets number of points sent in single request

func (*Options) SetFlushInterval

func (o *Options) SetFlushInterval(flushIntervalMs uint) *Options

SetFlushInterval sets flush interval in ms in which is buffer flushed if it has not been already written

func (*Options) SetLogLevel

func (o *Options) SetLogLevel(logLevel uint) *Options

SetLogLevel set level to filter log messages. Each level mean to log all categories bellow. 0 error, 1 - warning, 2 - info, 3 - debug Debug level will print also content of writen batches

func (*Options) SetMaxRetries

func (o *Options) SetMaxRetries(maxRetries uint) *Options

SetMaxRetries sets maximum count of retry attempts of failed writes

func (*Options) SetPrecision

func (o *Options) SetPrecision(precision time.Duration) *Options

SetPrecision sets time precision to use in writes for timestamp. In unit of duration: time.Nanosecond, time.Microsecond, time.Millisecond, time.Second

func (*Options) SetRetryBufferLimit

func (o *Options) SetRetryBufferLimit(retryBufferLimit uint) *Options

SetRetryBufferLimit sets maximum number of points to keep for retry. Should be multiple of BatchSize.

func (*Options) SetRetryInterval

func (o *Options) SetRetryInterval(retryIntervalMs uint) *Options

SetRetryInterval sets retry interval in ms, which is set if not sent by server

func (*Options) SetTlsConfig

func (o *Options) SetTlsConfig(tlsConfig *tls.Config) *Options

SetTlsConfig sets TLS configuration for secure connection

func (*Options) SetUseGZip

func (o *Options) SetUseGZip(useGZip bool) *Options

SetUseGZip specifies whether to use GZip compression in write requests.

func (*Options) TlsConfig

func (o *Options) TlsConfig() *tls.Config

TlsConfig returns TlsConfig

func (*Options) UseGZip

func (o *Options) UseGZip() bool

UseGZip returns true if write request are gzip`ed

type Point

type Point struct {
	// contains filtered or unexported fields
}

Point is represents InfluxDB time series point, holding tags and fields

func NewPoint

func NewPoint(
	measurement string,
	tags map[string]string,
	fields map[string]interface{},
	ts time.Time,
) *Point

NewPoint creates a Point from measurement name, tags, fields and a timestamp.

func NewPointWithMeasurement

func NewPointWithMeasurement(measurement string) *Point

NewPointWithMeasurement creates a empty Point Use AddTag and AddField to fill point with data

func (*Point) AddField

func (m *Point) AddField(k string, v interface{}) *Point

AddField adds a field to a point.

func (*Point) AddTag

func (m *Point) AddTag(k, v string) *Point

AddTag adds a tag to a point.

func (*Point) FieldList

func (m *Point) FieldList() []*lp.Field

FieldList returns a slice containing the fields of a Point.

func (*Point) Name

func (m *Point) Name() string

Name returns the name of measurement of a point.

func (*Point) SetTime

func (m *Point) SetTime(timestamp time.Time) *Point

SetTime set timestamp for a Point.

func (*Point) SortFields

func (m *Point) SortFields() *Point

SortFields orders the fields of a point alphanumerically by key.

func (*Point) SortTags

func (m *Point) SortTags() *Point

SortTags orders the tags of a point alphanumerically by key. This is just here as a helper, to make it easy to keep tags sorted if you are creating a Point manually.

func (*Point) TagList

func (m *Point) TagList() []*lp.Tag

TagList returns a slice containing tags of a Point.

func (*Point) Time

func (m *Point) Time() time.Time

Time is the timestamp of a Point.

type QueryApi

type QueryApi interface {
	// QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
	QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
	// Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
	Query(ctx context.Context, query string) (*QueryTableResult, error)
}

QueryApi provides methods for performing synchronously flux query against InfluxDB server

type QueryTableResult

type QueryTableResult struct {
	io.Closer
	// contains filtered or unexported fields
}

QueryTableResult parses streamed flux query response into structures representing flux table parts Walking though the result is done by repeatedly calling Next() until returns false. Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method. Data are acquired by Record() method. Preliminary end can be caused by an error, so when Next() return false, check Err() for an error

func (*QueryTableResult) Err

func (q *QueryTableResult) Err() error

Err returns an error raised during flux query response parsing

func (*QueryTableResult) Next

func (q *QueryTableResult) Next() bool

Next advances to next row in query result. During the first time it is called, Next creates also table metadata Actual parsed row is available through Record() function Returns false in case of end or an error, otherwise true

func (*QueryTableResult) Record

func (q *QueryTableResult) Record() *FluxRecord

Record returns last parsed flux table data row Use Record methods to access value and row properties

func (*QueryTableResult) TableChanged

func (q *QueryTableResult) TableChanged() bool

TableChanged returns true if last call of Next() found also new result table Table information is available via TableMetadata method

func (*QueryTableResult) TableMetadata

func (q *QueryTableResult) TableMetadata() *FluxTableMetadata

TableMetadata returns actual flux table metadata

func (*QueryTableResult) TablePosition

func (q *QueryTableResult) TablePosition() int

TablePosition returns actual flux table position in the result. Each new table is introduced by annotations

type RequestCallback

type RequestCallback func(req *http.Request)

Http operation callbacks

type ResponseCallback

type ResponseCallback func(req *http.Response) error

type WriteApi

type WriteApi interface {
	// WriteRecord writes asynchronously line protocol record into bucket.
	// WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size.
	// Blocking alternative is available in the WriteApiBlocking interface
	WriteRecord(line string)
	// WritePoint writes asynchronously Point into bucket.
	// WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size.
	// Blocking alternative is available in the WriteApiBlocking interface
	WritePoint(point *Point)
	// Flush forces all pending writes from the buffer to be sent
	Flush()
	// Flushes all pending writes and stop async processes. After this the Write client cannot be used
	Close()
	// Errors return channel for reading errors which occurs during async writes
	Errors() <-chan error
}

WriteApiBlocking is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server.

type WriteApiBlocking

type WriteApiBlocking interface {
	// WriteRecord writes line protocol record(s) into bucket.
	// WriteRecord writes without implicit batching. Batch is created from given number of records
	// Non-blocking alternative is available in the WriteApi interface
	WriteRecord(ctx context.Context, line ...string) error
	// WritePoint data point into bucket.
	// WritePoint writes without implicit batching. Batch is created from given number of points
	// Non-blocking alternative is available in the WriteApi interface
	WritePoint(ctx context.Context, point ...*Point) error
}

WriteApiBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server.

Directories

Path Synopsis
Package domain provides primitives to interact the openapi HTTP API.
Package domain provides primitives to interact the openapi HTTP API.
internal
log

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL