api

package
Version: v2.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2022 License: MIT Imports: 27 Imported by: 108

Documentation

Overview

Package api provides clients for InfluxDB server APIs.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultDialect

func DefaultDialect() *domain.Dialect

DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter

Types

type AuthorizationsAPI

type AuthorizationsAPI interface {
	// GetAuthorizations returns all authorizations
	GetAuthorizations(ctx context.Context) (*[]domain.Authorization, error)
	// FindAuthorizationsByUserName returns all authorizations for given userName
	FindAuthorizationsByUserName(ctx context.Context, userName string) (*[]domain.Authorization, error)
	// FindAuthorizationsByUserID returns all authorizations for given userID
	FindAuthorizationsByUserID(ctx context.Context, userID string) (*[]domain.Authorization, error)
	// FindAuthorizationsByOrgName returns all authorizations for given organization name
	FindAuthorizationsByOrgName(ctx context.Context, orgName string) (*[]domain.Authorization, error)
	// FindAuthorizationsByOrgID returns all authorizations for given organization id
	FindAuthorizationsByOrgID(ctx context.Context, orgID string) (*[]domain.Authorization, error)
	// CreateAuthorization creates new authorization
	CreateAuthorization(ctx context.Context, authorization *domain.Authorization) (*domain.Authorization, error)
	// CreateAuthorizationWithOrgID creates new authorization with given permissions scoped to given orgID
	CreateAuthorizationWithOrgID(ctx context.Context, orgID string, permissions []domain.Permission) (*domain.Authorization, error)
	// UpdateAuthorizationStatus updates status of authorization
	UpdateAuthorizationStatus(ctx context.Context, authorization *domain.Authorization, status domain.AuthorizationUpdateRequestStatus) (*domain.Authorization, error)
	// UpdateAuthorizationStatusWithID updates status of authorization with authID
	UpdateAuthorizationStatusWithID(ctx context.Context, authID string, status domain.AuthorizationUpdateRequestStatus) (*domain.Authorization, error)
	// DeleteAuthorization deletes authorization
	DeleteAuthorization(ctx context.Context, authorization *domain.Authorization) error
	// DeleteAuthorization deletes authorization with authID
	DeleteAuthorizationWithID(ctx context.Context, authID string) error
}

AuthorizationsAPI provides methods for organizing Authorization in a InfluxDB server

Example
package main

import (
	"context"
	"fmt"

	"github.com/influxdata/influxdb-client-go/v2/domain"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")

	// Find user to grant permission
	user, err := client.UsersAPI().FindUserByName(context.Background(), "user-01")
	if err != nil {
		panic(err)
	}

	// Find organization
	org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org")
	if err != nil {
		panic(err)
	}

	// create write permission for buckets
	permissionWrite := &domain.Permission{
		Action: domain.PermissionActionWrite,
		Resource: domain.Resource{
			Type: domain.ResourceTypeBuckets,
		},
	}

	// create read permission for buckets
	permissionRead := &domain.Permission{
		Action: domain.PermissionActionRead,
		Resource: domain.Resource{
			Type: domain.ResourceTypeBuckets,
		},
	}

	// group permissions
	permissions := []domain.Permission{*permissionWrite, *permissionRead}

	// create authorization object using info above
	auth := &domain.Authorization{
		OrgID:       org.Id,
		Permissions: &permissions,
		UserID:      user.Id,
	}

	// grant permission and create token
	authCreated, err := client.AuthorizationsAPI().CreateAuthorization(context.Background(), auth)
	if err != nil {
		panic(err)
	}
	// Use token
	fmt.Println("Token: ", *authCreated.Token)
	// Ensures background processes finishes
	client.Close()
}
Output:

func NewAuthorizationsAPI

func NewAuthorizationsAPI(apiClient *domain.ClientWithResponses) AuthorizationsAPI

NewAuthorizationsAPI creates new instance of AuthorizationsAPI

type BucketsAPI

type BucketsAPI interface {
	// GetBuckets returns all buckets.
	// GetBuckets supports PagingOptions: Offset, Limit, After. Empty pagingOptions means the default paging (first 20 results).
	GetBuckets(ctx context.Context, pagingOptions ...PagingOption) (*[]domain.Bucket, error)
	// FindBucketByName returns a bucket found using bucketName.
	FindBucketByName(ctx context.Context, bucketName string) (*domain.Bucket, error)
	// FindBucketByID returns a bucket found using bucketID.
	FindBucketByID(ctx context.Context, bucketID string) (*domain.Bucket, error)
	// FindBucketsByOrgID returns buckets belonging to the organization with ID orgID.
	// FindBucketsByOrgID supports PagingOptions: Offset, Limit, After. Empty pagingOptions means the default paging (first 20 results).
	FindBucketsByOrgID(ctx context.Context, orgID string, pagingOptions ...PagingOption) (*[]domain.Bucket, error)
	// FindBucketsByOrgName returns buckets belonging to the organization with name orgName, with the specified paging. Empty pagingOptions means the default paging (first 20 results).
	FindBucketsByOrgName(ctx context.Context, orgName string, pagingOptions ...PagingOption) (*[]domain.Bucket, error)
	// CreateBucket creates a new bucket.
	CreateBucket(ctx context.Context, bucket *domain.Bucket) (*domain.Bucket, error)
	// CreateBucketWithName creates a new bucket with bucketName in organization org, with retention specified in rules. Empty rules means infinite retention.
	CreateBucketWithName(ctx context.Context, org *domain.Organization, bucketName string, rules ...domain.RetentionRule) (*domain.Bucket, error)
	// CreateBucketWithNameWithID creates a new bucket with bucketName in organization with orgID, with retention specified in rules. Empty rules means infinite retention.
	CreateBucketWithNameWithID(ctx context.Context, orgID, bucketName string, rules ...domain.RetentionRule) (*domain.Bucket, error)
	// UpdateBucket updates a bucket.
	UpdateBucket(ctx context.Context, bucket *domain.Bucket) (*domain.Bucket, error)
	// DeleteBucket deletes a bucket.
	DeleteBucket(ctx context.Context, bucket *domain.Bucket) error
	// DeleteBucketWithID deletes a bucket with bucketID.
	DeleteBucketWithID(ctx context.Context, bucketID string) error
	// GetMembers returns members of a bucket.
	GetMembers(ctx context.Context, bucket *domain.Bucket) (*[]domain.ResourceMember, error)
	// GetMembersWithID returns members of a bucket with bucketID.
	GetMembersWithID(ctx context.Context, bucketID string) (*[]domain.ResourceMember, error)
	// AddMember adds a member to a bucket.
	AddMember(ctx context.Context, bucket *domain.Bucket, user *domain.User) (*domain.ResourceMember, error)
	// AddMemberWithID adds a member with id memberID to a bucket with bucketID.
	AddMemberWithID(ctx context.Context, bucketID, memberID string) (*domain.ResourceMember, error)
	// RemoveMember removes a member from a bucket.
	RemoveMember(ctx context.Context, bucket *domain.Bucket, user *domain.User) error
	// RemoveMemberWithID removes a member with id memberID from a bucket with bucketID.
	RemoveMemberWithID(ctx context.Context, bucketID, memberID string) error
	// GetOwners returns owners of a bucket.
	GetOwners(ctx context.Context, bucket *domain.Bucket) (*[]domain.ResourceOwner, error)
	// GetOwnersWithID returns owners of a bucket with bucketID.
	GetOwnersWithID(ctx context.Context, bucketID string) (*[]domain.ResourceOwner, error)
	// AddOwner adds an owner to a bucket.
	AddOwner(ctx context.Context, bucket *domain.Bucket, user *domain.User) (*domain.ResourceOwner, error)
	// AddOwnerWithID adds an owner with id memberID to a bucket with bucketID.
	AddOwnerWithID(ctx context.Context, bucketID, memberID string) (*domain.ResourceOwner, error)
	// RemoveOwner removes an owner from a bucket.
	RemoveOwner(ctx context.Context, bucket *domain.Bucket, user *domain.User) error
	// RemoveOwnerWithID removes a member with id memberID from a bucket with bucketID.
	RemoveOwnerWithID(ctx context.Context, bucketID, memberID string) error
}

BucketsAPI provides methods for managing Buckets in a InfluxDB server.

Example
package main

import (
	"context"

	"github.com/influxdata/influxdb-client-go/v2/domain"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")

	ctx := context.Background()
	// Get Buckets API client
	bucketsAPI := client.BucketsAPI()

	// Get organization that will own new bucket
	org, err := client.OrganizationsAPI().FindOrganizationByName(ctx, "my-org")
	if err != nil {
		panic(err)
	}
	// Create  a bucket with 1 day retention policy
	bucket, err := bucketsAPI.CreateBucketWithName(ctx, org, "bucket-sensors", domain.RetentionRule{EverySeconds: 3600 * 24})
	if err != nil {
		panic(err)
	}

	// Update description of the bucket
	desc := "Bucket for sensor data"
	bucket.Description = &desc
	bucket, err = bucketsAPI.UpdateBucket(ctx, bucket)
	if err != nil {
		panic(err)
	}

	// Close the client
	client.Close()
}
Output:

func NewBucketsAPI

func NewBucketsAPI(apiClient *domain.ClientWithResponses) BucketsAPI

NewBucketsAPI creates new instance of BucketsAPI

type DeleteAPI

type DeleteAPI interface {
	// Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket bucket belonging to the organization org.
	Delete(ctx context.Context, org *domain.Organization, bucket *domain.Bucket, start, stop time.Time, predicate string) error
	// Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket with ID bucketID belonging to the organization with ID orgID.
	DeleteWithID(ctx context.Context, orgID, bucketID string, start, stop time.Time, predicate string) error
	// Delete deletes series selected by by the time range specified by start and stop arguments and optional predicate string from the bucket with name bucketName belonging to the organization with name orgName.
	DeleteWithName(ctx context.Context, orgName, bucketName string, start, stop time.Time, predicate string) error
}

DeleteAPI provides methods for deleting time series data from buckets. Deleted series are selected by the time range specified by start and stop arguments and optional predicate string which contains condition for selecting data for deletion, such as:

tag1="value1" and (tag2="value2" and tag3!="value3")

Empty predicate string means all data from the given time range will be deleted. See https://v2.docs.influxdata.com/v2.0/reference/syntax/delete-predicate/ for more info about predicate syntax.

Example
package main

import (
	"context"
	"time"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")

	ctx := context.Background()
	// Get Delete API client
	deleteAPI := client.DeleteAPI()
	// Delete last hour data with tag b = static
	err := deleteAPI.DeleteWithName(ctx, "org", "my-bucket", time.Now().Add(-time.Hour), time.Now(), "b=static")
	if err != nil {
		panic(err)
	}

	// Close the client
	client.Close()
}
Output:

func NewDeleteAPI

func NewDeleteAPI(apiClient *domain.ClientWithResponses) DeleteAPI

NewDeleteAPI creates new instance of DeleteAPI

type LabelsAPI

type LabelsAPI interface {
	// GetLabels returns all labels.
	GetLabels(ctx context.Context) (*[]domain.Label, error)
	// FindLabelsByOrg returns labels belonging to organization org.
	FindLabelsByOrg(ctx context.Context, org *domain.Organization) (*[]domain.Label, error)
	// FindLabelsByOrgID returns labels belonging to organization with id orgID.
	FindLabelsByOrgID(ctx context.Context, orgID string) (*[]domain.Label, error)
	// FindLabelByID returns a label with labelID.
	FindLabelByID(ctx context.Context, labelID string) (*domain.Label, error)
	// FindLabelByName returns a label with name labelName under an organization orgID.
	FindLabelByName(ctx context.Context, orgID, labelName string) (*domain.Label, error)
	// CreateLabel creates a new label.
	CreateLabel(ctx context.Context, label *domain.LabelCreateRequest) (*domain.Label, error)
	// CreateLabelWithName creates a new label with label labelName and properties, under the organization org.
	// Properties example: {"color": "ffb3b3", "description": "this is a description"}.
	CreateLabelWithName(ctx context.Context, org *domain.Organization, labelName string, properties map[string]string) (*domain.Label, error)
	// CreateLabelWithName creates a new label with label labelName and properties, under the organization with id orgID.
	// Properties example: {"color": "ffb3b3", "description": "this is a description"}.
	CreateLabelWithNameWithID(ctx context.Context, orgID, labelName string, properties map[string]string) (*domain.Label, error)
	// UpdateLabel updates the label.
	// Properties can be removed by sending an update with an empty value.
	UpdateLabel(ctx context.Context, label *domain.Label) (*domain.Label, error)
	// DeleteLabelWithID deletes a label with labelID.
	DeleteLabelWithID(ctx context.Context, labelID string) error
	// DeleteLabel deletes a label.
	DeleteLabel(ctx context.Context, label *domain.Label) error
}

LabelsAPI provides methods for managing labels in a InfluxDB server.

Example
package main

import (
	"context"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")

	ctx := context.Background()
	// Get Labels API client
	labelsAPI := client.LabelsAPI()
	// Get Organizations API client
	orgsAPI := client.OrganizationsAPI()

	// Get organization that will own label
	myorg, err := orgsAPI.FindOrganizationByName(ctx, "my-org")
	if err != nil {
		panic(err)
	}

	labelName := "Active State"
	props := map[string]string{"color": "33ffdd", "description": "Marks org active"}
	label, err := labelsAPI.CreateLabelWithName(ctx, myorg, labelName, props)
	if err != nil {
		panic(err)
	}

	// Change color property
	label.Properties.AdditionalProperties = map[string]string{"color": "ff1122"}
	label, err = labelsAPI.UpdateLabel(ctx, label)
	if err != nil {
		panic(err)
	}

	// Close the client
	client.Close()
}
Output:

func NewLabelsAPI

func NewLabelsAPI(apiClient *domain.ClientWithResponses) LabelsAPI

NewLabelsAPI creates new instance of LabelsAPI

type OrganizationsAPI

type OrganizationsAPI interface {
	// GetOrganizations returns all organizations.
	// GetOrganizations supports PagingOptions: Offset, Limit, Descending
	GetOrganizations(ctx context.Context, pagingOptions ...PagingOption) (*[]domain.Organization, error)
	// FindOrganizationByName returns an organization found using orgName.
	FindOrganizationByName(ctx context.Context, orgName string) (*domain.Organization, error)
	// FindOrganizationByID returns an organization found using orgID.
	FindOrganizationByID(ctx context.Context, orgID string) (*domain.Organization, error)
	// FindOrganizationsByUserID returns organizations an user with userID belongs to.
	// FindOrganizationsByUserID supports PagingOptions: Offset, Limit, Descending
	FindOrganizationsByUserID(ctx context.Context, userID string, pagingOptions ...PagingOption) (*[]domain.Organization, error)
	// CreateOrganization creates new organization.
	CreateOrganization(ctx context.Context, org *domain.Organization) (*domain.Organization, error)
	// CreateOrganizationWithName creates new organization with orgName and with status active.
	CreateOrganizationWithName(ctx context.Context, orgName string) (*domain.Organization, error)
	// UpdateOrganization updates organization.
	UpdateOrganization(ctx context.Context, org *domain.Organization) (*domain.Organization, error)
	// DeleteOrganization deletes an organization.
	DeleteOrganization(ctx context.Context, org *domain.Organization) error
	// DeleteOrganizationWithID deletes an organization with orgID.
	DeleteOrganizationWithID(ctx context.Context, orgID string) error
	// GetMembers returns members of an organization.
	GetMembers(ctx context.Context, org *domain.Organization) (*[]domain.ResourceMember, error)
	// GetMembersWithID returns members of an organization with orgID.
	GetMembersWithID(ctx context.Context, orgID string) (*[]domain.ResourceMember, error)
	// AddMember adds a member to an organization.
	AddMember(ctx context.Context, org *domain.Organization, user *domain.User) (*domain.ResourceMember, error)
	// AddMemberWithID adds a member with id memberID to an organization with orgID.
	AddMemberWithID(ctx context.Context, orgID, memberID string) (*domain.ResourceMember, error)
	// RemoveMember removes a member from an organization.
	RemoveMember(ctx context.Context, org *domain.Organization, user *domain.User) error
	// RemoveMemberWithID removes a member with id memberID from an organization with orgID.
	RemoveMemberWithID(ctx context.Context, orgID, memberID string) error
	// GetOwners returns owners of an organization.
	GetOwners(ctx context.Context, org *domain.Organization) (*[]domain.ResourceOwner, error)
	// GetOwnersWithID returns owners of an organization with orgID.
	GetOwnersWithID(ctx context.Context, orgID string) (*[]domain.ResourceOwner, error)
	// AddOwner adds an owner to an organization.
	AddOwner(ctx context.Context, org *domain.Organization, user *domain.User) (*domain.ResourceOwner, error)
	// AddOwnerWithID adds an owner with id memberID to an organization with orgID.
	AddOwnerWithID(ctx context.Context, orgID, memberID string) (*domain.ResourceOwner, error)
	// RemoveOwner removes an owner from an organization.
	RemoveOwner(ctx context.Context, org *domain.Organization, user *domain.User) error
	// RemoveOwnerWithID removes an owner with id memberID from an organization with orgID.
	RemoveOwnerWithID(ctx context.Context, orgID, memberID string) error
}

OrganizationsAPI provides methods for managing Organizations in a InfluxDB server.

Example
package main

import (
	"context"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")

	// Get Organizations API client
	orgAPI := client.OrganizationsAPI()

	// Create new organization
	org, err := orgAPI.CreateOrganizationWithName(context.Background(), "org-2")
	if err != nil {
		panic(err)
	}

	orgDescription := "My second org "
	org.Description = &orgDescription

	org, err = orgAPI.UpdateOrganization(context.Background(), org)
	if err != nil {
		panic(err)
	}

	// Find user to set owner
	user, err := client.UsersAPI().FindUserByName(context.Background(), "user-01")
	if err != nil {
		panic(err)
	}

	// Add another owner (first owner is the one who create organization
	_, err = orgAPI.AddOwner(context.Background(), org, user)
	if err != nil {
		panic(err)
	}

	// Create new user to add to org
	newUser, err := client.UsersAPI().CreateUserWithName(context.Background(), "user-02")
	if err != nil {
		panic(err)
	}

	// Add new user to organization
	_, err = orgAPI.AddMember(context.Background(), org, newUser)
	if err != nil {
		panic(err)
	}
	// Ensures background processes finishes
	client.Close()
}
Output:

func NewOrganizationsAPI

func NewOrganizationsAPI(apiClient *domain.ClientWithResponses) OrganizationsAPI

NewOrganizationsAPI creates new instance of OrganizationsAPI

type Paging

type Paging struct {
	// contains filtered or unexported fields
}

Paging holds pagination parameters for various Get* functions of InfluxDB 2 API Not the all options are usable for some Get* functions

type PagingOption

type PagingOption func(p *Paging)

PagingOption is the function type for applying paging option

func PagingWithAfter added in v2.1.0

func PagingWithAfter(after string) PagingOption

PagingWithAfter set after option - the last resource ID from which to seek from (but not including). This is to be used instead of `offset`.

func PagingWithDescending

func PagingWithDescending(descending bool) PagingOption

PagingWithDescending changes sorting direction

func PagingWithLimit

func PagingWithLimit(limit int) PagingOption

PagingWithLimit sets limit option - maximum number of items returned.

func PagingWithOffset

func PagingWithOffset(offset int) PagingOption

PagingWithOffset set starting offset for returning items. Default 0.

func PagingWithSortBy

func PagingWithSortBy(sortBy string) PagingOption

PagingWithSortBy sets field name which should be used for sorting

type QueryAPI

type QueryAPI interface {
	// QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
	QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
	// QueryRawWithParams executes flux parametrized query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
	QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error)
	// Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
	Query(ctx context.Context, query string) (*QueryTableResult, error)
	// QueryWithParams executes flux parametrized query  on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
	QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error)
}

QueryAPI provides methods for performing synchronously flux query against InfluxDB server.

Flux query can contain reference to parameters, which must be passed via queryParams. it can be a struct or map. Param values can be only simple types or time.Time. The name of a struct field or a map key (must be a string) will be a param name. The name of the parameter represented by a struct field can be specified by JSON annotation:

type Condition struct {

    Start  time.Time  `json:"start"`
    Field  string     `json:"field"`
    Value  float64    `json:"value"`
	}

 Parameters are then accessed via the Flux params object:

 query:= `from(bucket: "environment")
		|> range(start: time(v: params.start))
		|> filter(fn: (r) => r._measurement == "air")
		|> filter(fn: (r) => r._field == params.field)
		|> filter(fn: (r) => r._value > params.value)`
Example (Query)
package main

import (
	"context"
	"fmt"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get query client
	queryAPI := client.QueryAPI("my-org")
	// get QueryTableResult
	result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Notice when group key has changed
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
			// Access data
			fmt.Printf("value: %v\n", result.Record().Value())
		}
		// check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}
	// Ensures background processes finishes
	client.Close()
}
Output:

Example (QueryRaw)
package main

import (
	"context"
	"fmt"

	"github.com/influxdata/influxdb-client-go/v2/api"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get query client
	queryAPI := client.QueryAPI("my-org")
	// Query and get complete result as a string
	// Use default dialect
	result, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, api.DefaultDialect())
	if err == nil {
		fmt.Println("QueryResult:")
		fmt.Println(result)
	} else {
		panic(err)
	}
	// Ensures background processes finishes
	client.Close()
}
Output:

Example (QueryWithParams)
package main

import (
	"context"
	"fmt"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get query client
	queryAPI := client.QueryAPI("my-org")
	// Define parameters
	parameters := struct {
		Start string  `json:"start"`
		Field string  `json:"field"`
		Value float64 `json:"value"`
	}{
		"-1h",
		"temperature",
		25,
	}
	// Query with parameters
	query := `from(bucket:"my-bucket")
				|> range(start: duration(params.start)) 
				|> filter(fn: (r) => r._measurement == "stat")
				|> filter(fn: (r) => r._field == params.field)
				|> filter(fn: (r) => r._value > params.value)`

	// Get result
	result, err := queryAPI.QueryWithParams(context.Background(), query, parameters)
	if err == nil {
		// Iterate over query response
		for result.Next() {
			// Notice when group key has changed
			if result.TableChanged() {
				fmt.Printf("table: %s\n", result.TableMetadata().String())
			}
			// Access data
			fmt.Printf("value: %v\n", result.Record().Value())
		}
		// check for an error
		if result.Err() != nil {
			fmt.Printf("query parsing error: %s\n", result.Err().Error())
		}
	} else {
		panic(err)
	}
	// Ensures background processes finishes
	client.Close()
}
Output:

func NewQueryAPI

func NewQueryAPI(org string, service http2.Service) QueryAPI

NewQueryAPI returns new query client for querying buckets belonging to org

type QueryTableResult

type QueryTableResult struct {
	io.Closer
	// contains filtered or unexported fields
}

QueryTableResult parses streamed flux query response into structures representing flux table parts Walking though the result is done by repeatedly calling Next() until returns false. Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method. Data are acquired by Record() method. Preliminary end can be caused by an error, so when Next() return false, check Err() for an error

func NewQueryTableResult added in v2.8.0

func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult

NewQueryTableResult returns new QueryTableResult

func (*QueryTableResult) Close added in v2.4.0

func (q *QueryTableResult) Close() error

Close reads remaining data and closes underlying Closer

func (*QueryTableResult) Err

func (q *QueryTableResult) Err() error

Err returns an error raised during flux query response parsing

func (*QueryTableResult) Next

func (q *QueryTableResult) Next() bool

Next advances to next row in query result. During the first time it is called, Next creates also table metadata Actual parsed row is available through Record() function Returns false in case of end or an error, otherwise true

func (*QueryTableResult) Record

func (q *QueryTableResult) Record() *query.FluxRecord

Record returns last parsed flux table data row Use Record methods to access value and row properties

func (*QueryTableResult) TableChanged

func (q *QueryTableResult) TableChanged() bool

TableChanged returns true if last call of Next() found also new result table Table information is available via TableMetadata method

func (*QueryTableResult) TableMetadata

func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata

TableMetadata returns actual flux table metadata

func (*QueryTableResult) TablePosition

func (q *QueryTableResult) TablePosition() int

TablePosition returns actual flux table position in the result, or -1 if no table was found yet Each new table is introduced by an annotation in csv

type RunFilter added in v2.2.0

type RunFilter struct {
	// Return runs after a specified ID.
	After string
	// The number of runs to return.
	// Default 100, minimum 1, maximum 500.
	Limit int
	// Filter runs to those scheduled before this time.
	BeforeTime time.Time
	// Filter runs to those scheduled after this time.
	AfterTime time.Time
}

RunFilter defines filtering options for FindRun* functions.

type TaskFilter added in v2.2.0

type TaskFilter struct {
	// Returns task with a specific name
	Name string
	// Filter tasks to a specific organization name.
	OrgName string
	// Filter tasks to a specific organization ID.
	OrgID string
	// Filter tasks to a specific user ID.
	User string
	// Filter tasks by a status--"inactive" or "active".
	Status domain.TaskStatusType
	// Return tasks after a specified ID.
	After string
	// The number of tasks to return.
	// Default 100, minimum: 1, maximum 500
	Limit int
}

TaskFilter defines filtering options for FindTasks functions.

type TasksAPI added in v2.2.0

type TasksAPI interface {
	// FindTasks retrieves tasks according to the filter. More fields can be applied. Filter can be nil.
	FindTasks(ctx context.Context, filter *TaskFilter) ([]domain.Task, error)
	// GetTask retrieves a refreshed instance of task.
	GetTask(ctx context.Context, task *domain.Task) (*domain.Task, error)
	// GetTaskByID retrieves a task found using taskID.
	GetTaskByID(ctx context.Context, taskID string) (*domain.Task, error)
	// CreateTask creates a new task according the the task object.
	// It copies OrgId, Name, Description, Flux, Status and Every or Cron properties. Every and Cron are mutually exclusive.
	// Every has higher priority.
	CreateTask(ctx context.Context, task *domain.Task) (*domain.Task, error)
	// CreateTaskWithEvery creates a new task with with the name, flux script and every repetition setting, in the org orgID.
	// Every holds duration values.
	CreateTaskWithEvery(ctx context.Context, name, flux, every, orgID string) (*domain.Task, error)
	// CreateTaskWithCron creates a new task with with the name, flux script and cron repetition setting, in the org orgID
	// Cron holds cron-like setting, e.g. once an hour at beginning of the hour "0 * * * *".
	CreateTaskWithCron(ctx context.Context, name, flux, cron, orgID string) (*domain.Task, error)
	// UpdateTask updates a task.
	// It copies Description, Flux, Status, Offset and Every or Cron properties. Every and Cron are mutually exclusive.
	// Every has higher priority.
	UpdateTask(ctx context.Context, task *domain.Task) (*domain.Task, error)
	// DeleteTask deletes a task.
	DeleteTask(ctx context.Context, task *domain.Task) error
	// DeleteTaskWithID deletes a task with taskID.
	DeleteTaskWithID(ctx context.Context, taskID string) error
	// FindMembers retrieves members of a task.
	FindMembers(ctx context.Context, task *domain.Task) ([]domain.ResourceMember, error)
	// FindMembersWithID retrieves members of a task with taskID.
	FindMembersWithID(ctx context.Context, taskID string) ([]domain.ResourceMember, error)
	// AddMember adds a member to a task.
	AddMember(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceMember, error)
	// AddMemberWithID adds a member with id memberID to a task with taskID.
	AddMemberWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceMember, error)
	// RemoveMember removes a member from a task.
	RemoveMember(ctx context.Context, task *domain.Task, user *domain.User) error
	// RemoveMemberWithID removes a member with id memberID from a task with taskID.
	RemoveMemberWithID(ctx context.Context, taskID, memberID string) error
	// FindOwners retrieves owners of a task.
	FindOwners(ctx context.Context, task *domain.Task) ([]domain.ResourceOwner, error)
	// FindOwnersWithID retrieves owners of a task with taskID.
	FindOwnersWithID(ctx context.Context, taskID string) ([]domain.ResourceOwner, error)
	// AddOwner adds an owner to a task.
	AddOwner(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceOwner, error)
	// AddOwnerWithID adds an owner with id memberID to a task with taskID.
	AddOwnerWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceOwner, error)
	// RemoveOwner removes an owner from a task.
	RemoveOwner(ctx context.Context, task *domain.Task, user *domain.User) error
	// RemoveOwnerWithID removes a member with id memberID from a task with taskID.
	RemoveOwnerWithID(ctx context.Context, taskID, memberID string) error
	// FindRuns retrieves a task runs according the filter. More fields can be applied. Filter can be nil.
	FindRuns(ctx context.Context, task *domain.Task, filter *RunFilter) ([]domain.Run, error)
	// FindRunsWithID retrieves runs of a task with taskID according the filter. More fields can be applied. Filter can be nil.
	FindRunsWithID(ctx context.Context, taskID string, filter *RunFilter) ([]domain.Run, error)
	// GetRun retrieves a refreshed instance if a task run.
	GetRun(ctx context.Context, run *domain.Run) (*domain.Run, error)
	// GetRunByID retrieves a specific task run by taskID and runID
	GetRunByID(ctx context.Context, taskID, runID string) (*domain.Run, error)
	// FindRunLogs return all log events for a task run.
	FindRunLogs(ctx context.Context, run *domain.Run) ([]domain.LogEvent, error)
	// FindRunLogsWithID return all log events for a run with runID of a task with taskID.
	FindRunLogsWithID(ctx context.Context, taskID, runID string) ([]domain.LogEvent, error)
	// RunManually manually start a run of the task now, overriding the current schedule.
	RunManually(ctx context.Context, task *domain.Task) (*domain.Run, error)
	// RunManuallyWithID manually start a run of a task with taskID now, overriding the current schedule.
	RunManuallyWithID(ctx context.Context, taskID string) (*domain.Run, error)
	// RetryRun retry a task run.
	RetryRun(ctx context.Context, run *domain.Run) (*domain.Run, error)
	// RetryRunWithID retry a run with runID of a task with taskID.
	RetryRunWithID(ctx context.Context, taskID, runID string) (*domain.Run, error)
	// CancelRun cancels a running task.
	CancelRun(ctx context.Context, run *domain.Run) error
	// CancelRunWithID cancels a running task.
	CancelRunWithID(ctx context.Context, taskID, runID string) error
	// FindLogs retrieves all logs for a task.
	FindLogs(ctx context.Context, task *domain.Task) ([]domain.LogEvent, error)
	// FindLogsWithID retrieves all logs for a task with taskID.
	FindLogsWithID(ctx context.Context, taskID string) ([]domain.LogEvent, error)
	// FindLabels retrieves labels of a task.
	FindLabels(ctx context.Context, task *domain.Task) ([]domain.Label, error)
	// FindLabelsWithID retrieves labels of an task with taskID.
	FindLabelsWithID(ctx context.Context, taskID string) ([]domain.Label, error)
	// AddLabel adds a label to a task.
	AddLabel(ctx context.Context, task *domain.Task, label *domain.Label) (*domain.Label, error)
	// AddLabelWithID adds a label with id labelID to a task with taskID.
	AddLabelWithID(ctx context.Context, taskID, labelID string) (*domain.Label, error)
	// RemoveLabel removes a label from a task.
	RemoveLabel(ctx context.Context, task *domain.Task, label *domain.Label) error
	// RemoveLabelWithID removes a label with id labelID from a task with taskID.
	RemoveLabelWithID(ctx context.Context, taskID, labelID string) error
}

TasksAPI provides methods for managing tasks and task runs in an InfluxDB server.

Example
package main

import (
	"context"
	"fmt"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")

	ctx := context.Background()
	// Get Delete API client
	tasksAPI := client.TasksAPI()
	// Get organization that will own task
	myorg, err := client.OrganizationsAPI().FindOrganizationByName(ctx, "my-org")
	if err != nil {
		panic(err)
	}
	// task flux script from https://www.influxdata.com/blog/writing-tasks-and-setting-up-alerts-for-influxdb-cloud/
	flux := `fruitCollected = from(bucket: “farming”)
  |> range(start: -task.every)
  |> filter(fn: (r)  => (r._measurement == “totalFruitsCollected))
  |> filter(fn: (r)  => (r._field == “fruits))
  |> group(columns: [“farmName”])
  |> aggregateWindow(fn: sum, every: task.every)
  |> map(fn: (r) => {
    return: _time: r._time,  _stop: r._stop, _start: r._start, _measurement: “fruitCollectionRate”, _field: “fruits”, _value: r._value, farmName: farmName, 
  }
})

fruitCollected 
  |> to(bucket: “farming”)
`
	task, err := tasksAPI.CreateTaskWithEvery(ctx, "fruitCollectedRate", flux, "1h", *myorg.Id)
	if err != nil {
		panic(err)
	}
	// Force running a task
	run, err := tasksAPI.RunManually(ctx, task)
	if err != nil {
		panic(err)
	}

	fmt.Println("Forced run completed on ", *run.FinishedAt, " with status ", *run.Status)

	// Print logs
	logs, err := tasksAPI.FindRunLogs(ctx, run)
	if err != nil {
		panic(err)
	}

	fmt.Println("Log:")
	for _, logEvent := range logs {
		fmt.Println(" Time:", *logEvent.Time, ", Message: ", *logEvent.Message)
	}

	// Close the client
	client.Close()
}
Output:

func NewTasksAPI added in v2.2.0

func NewTasksAPI(apiClient *domain.ClientWithResponses) TasksAPI

NewTasksAPI creates new instance of TasksAPI

type UsersAPI

type UsersAPI interface {
	// GetUsers returns all users
	GetUsers(ctx context.Context) (*[]domain.User, error)
	// FindUserByID returns user with userID
	FindUserByID(ctx context.Context, userID string) (*domain.User, error)
	// FindUserByName returns user with name userName
	FindUserByName(ctx context.Context, userName string) (*domain.User, error)
	// CreateUser creates new user
	CreateUser(ctx context.Context, user *domain.User) (*domain.User, error)
	// CreateUserWithName creates new user with userName
	CreateUserWithName(ctx context.Context, userName string) (*domain.User, error)
	// UpdateUser updates user
	UpdateUser(ctx context.Context, user *domain.User) (*domain.User, error)
	// UpdateUserPassword sets password for an user
	UpdateUserPassword(ctx context.Context, user *domain.User, password string) error
	// UpdateUserPasswordWithID sets password for an user with userID
	UpdateUserPasswordWithID(ctx context.Context, userID string, password string) error
	// DeleteUserWithID deletes an user with userID
	DeleteUserWithID(ctx context.Context, userID string) error
	// DeleteUser deletes an user
	DeleteUser(ctx context.Context, user *domain.User) error
	// Me returns actual user
	Me(ctx context.Context) (*domain.User, error)
	// MeUpdatePassword set password of actual user
	MeUpdatePassword(ctx context.Context, oldPassword, newPassword string) error
	// SignIn exchanges username and password credentials to establish an authenticated session with the InfluxDB server. The Client's authentication token is then ignored, it can be empty.
	SignIn(ctx context.Context, username, password string) error
	// SignOut signs out previously signed in user
	SignOut(ctx context.Context) error
}

UsersAPI provides methods for managing users in a InfluxDB server

Example
package main

import (
	"context"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")

	// Find organization
	org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org")
	if err != nil {
		panic(err)
	}

	// Get users API client
	usersAPI := client.UsersAPI()

	// Create new user
	user, err := usersAPI.CreateUserWithName(context.Background(), "user-01")
	if err != nil {
		panic(err)
	}

	// Set user password
	err = usersAPI.UpdateUserPassword(context.Background(), user, "pass-at-least-8-chars")
	if err != nil {
		panic(err)
	}

	// Add user to organization
	_, err = client.OrganizationsAPI().AddMember(context.Background(), org, user)
	if err != nil {
		panic(err)
	}
	// Ensures background processes finishes
	client.Close()
}
Output:

Example (SignInOut)
package main

import (
	"context"

	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and empty token
	client := influxdb2.NewClient("http://localhost:8086", "")
	// Always close client at the end
	defer client.Close()

	ctx := context.Background()

	// The first call must be signIn
	err := client.UsersAPI().SignIn(ctx, "username", "password")
	if err != nil {
		panic(err)
	}

	// Perform some authorized operations
	err = client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(ctx, "test,a=rock,b=local f=1.2,i=-5i")
	if err != nil {
		panic(err)
	}

	// Sign out at the end
	err = client.UsersAPI().SignOut(ctx)
	if err != nil {
		panic(err)
	}
}
Output:

func NewUsersAPI

func NewUsersAPI(apiClient *domain.ClientWithResponses, httpService http.Service, httpClient *nethttp.Client) UsersAPI

NewUsersAPI creates new instance of UsersAPI

type WriteAPI

type WriteAPI interface {
	// WriteRecord writes asynchronously line protocol record into bucket.
	// WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size.
	// Blocking alternative is available in the WriteAPIBlocking interface
	WriteRecord(line string)
	// WritePoint writes asynchronously Point into bucket.
	// WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size.
	// Blocking alternative is available in the WriteAPIBlocking interface
	WritePoint(point *write.Point)
	// Flush forces all pending writes from the buffer to be sent
	Flush()
	// Errors returns a channel for reading errors which occurs during async writes.
	// Must be called before performing any writes for errors to be collected.
	// The chan is unbuffered and must be drained or the writer will block.
	Errors() <-chan error
	// SetWriteFailedCallback sets callback allowing custom handling of failed writes.
	// If callback returns true, failed batch will be retried, otherwise discarded.
	SetWriteFailedCallback(cb WriteFailedCallback)
}

WriteAPI is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server. WriteAPI can be used concurrently. When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.

Example
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/influxdata/influxdb-client-go/v2/api/write"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get non-blocking write client
	writeAPI := client.WriteAPI("my-org", "my-bucket")
	// write some points
	for i := 0; i < 100; i++ {
		// create point
		p := write.NewPoint(
			"system",
			map[string]string{
				"id":       fmt.Sprintf("rack_%v", i%10),
				"vendor":   "AWS",
				"hostname": fmt.Sprintf("host_%v", i%100),
			},
			map[string]interface{}{
				"temperature": rand.Float64() * 80.0,
				"disk_free":   rand.Float64() * 1000.0,
				"disk_total":  (i/10 + 1) * 1000000,
				"mem_total":   (i/100 + 1) * 10000000,
				"mem_free":    rand.Uint64(),
			},
			time.Now())
		// write asynchronously
		writeAPI.WritePoint(p)
	}
	// Force all unwritten data to be sent
	writeAPI.Flush()
	// Ensures background processes finishes
	client.Close()
}
Output:

Example (Errors)
package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/influxdata/influxdb-client-go/v2/api/write"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get non-blocking write client
	writeAPI := client.WriteAPI("my-org", "my-bucket")
	// Get errors channel
	errorsCh := writeAPI.Errors()
	// Create go proc for reading and logging errors
	go func() {
		for err := range errorsCh {
			fmt.Printf("write error: %s\n", err.Error())
		}
	}()
	// write some points
	for i := 0; i < 100; i++ {
		// create point
		p := write.NewPointWithMeasurement("stat").
			AddTag("id", fmt.Sprintf("rack_%v", i%10)).
			AddTag("vendor", "AWS").
			AddTag("hostname", fmt.Sprintf("host_%v", i%100)).
			AddField("temperature", rand.Float64()*80.0).
			AddField("disk_free", rand.Float64()*1000.0).
			AddField("disk_total", (i/10+1)*1000000).
			AddField("mem_total", (i/100+1)*10000000).
			AddField("mem_free", rand.Uint64()).
			SetTime(time.Now())
		// write asynchronously
		writeAPI.WritePoint(p)
	}
	// Force all unwritten data to be sent
	writeAPI.Flush()
	// Ensures background processes finishes
	client.Close()
}
Output:

type WriteAPIBlocking

type WriteAPIBlocking interface {
	// WriteRecord writes line protocol record(s) into bucket.
	// WriteRecord writes without implicit batching. Batch is created from given number of records.
	// Individual arguments can also be batches (multiple records separated by newline).
	// Non-blocking alternative is available in the WriteAPI interface
	WriteRecord(ctx context.Context, line ...string) error
	// WritePoint data point into bucket.
	// WritePoint writes without implicit batching. Batch is created from given number of points
	// Non-blocking alternative is available in the WriteAPI interface
	WritePoint(ctx context.Context, point ...*write.Point) error
}

WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server. It doesn't implicitly create batches of points. It is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.

WriteAPIBlocking can be used concurrently. When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.

To add implicit batching, use a wrapper, such as:

type writer struct {
	batch []*write.Point
	writeAPI api.WriteAPIBlocking
	batchSize int
}

func (w *writer) CurrentBatch() []*write.Point {
	return w.batch
}

func newWriter(writeAPI api.WriteAPIBlocking, batchSize int) *writer {
	return &writer{
		batch:     make([]*write.Point, 0, batchSize),
		writeAPI:  writeAPI,
		batchSize: batchSize,
	}
}

func (w *writer) write(ctx context.Context, p *write.Point) error {
	w.batch = append(w.batch, p)
	if len(w.batch) == w.batchSize {
		err := w.writeAPI.WritePoint(ctx, w.batch...)
		if err != nil {
			return err
		}
		w.batch = w.batch[:0]
	}
	return nil
}
Example
package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/influxdata/influxdb-client-go/v2/api/write"
	influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
)

func main() {
	// Create a new client using an InfluxDB server base URL and an authentication token
	client := influxdb2.NewClient("http://localhost:8086", "my-token")
	// Get blocking write client
	writeAPI := client.WriteAPIBlocking("my-org", "my-bucket")
	// write some points
	for i := 0; i < 100; i++ {
		// create data point
		p := write.NewPoint(
			"system",
			map[string]string{
				"id":       fmt.Sprintf("rack_%v", i%10),
				"vendor":   "AWS",
				"hostname": fmt.Sprintf("host_%v", i%100),
			},
			map[string]interface{}{
				"temperature": rand.Float64() * 80.0,
				"disk_free":   rand.Float64() * 1000.0,
				"disk_total":  (i/10 + 1) * 1000000,
				"mem_total":   (i/100 + 1) * 10000000,
				"mem_free":    rand.Uint64(),
			},
			time.Now())
		// write synchronously
		err := writeAPI.WritePoint(context.Background(), p)
		if err != nil {
			panic(err)
		}
	}
	// Ensures background processes finishes
	client.Close()
}
Output:

func NewWriteAPIBlocking

func NewWriteAPIBlocking(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking

NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org

type WriteAPIImpl

type WriteAPIImpl struct {
	// contains filtered or unexported fields
}

WriteAPIImpl provides main implementation for WriteAPI

func NewWriteAPI

func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl

NewWriteAPI returns new non-blocking write client for writing data to bucket belonging to org

func (*WriteAPIImpl) Close

func (w *WriteAPIImpl) Close()

Close finishes outstanding write operations, stop background routines and closes all channels

func (*WriteAPIImpl) Errors

func (w *WriteAPIImpl) Errors() <-chan error

Errors returns a channel for reading errors which occurs during async writes. Must be called before performing any writes for errors to be collected. The chan is unbuffered and must be drained or the writer will block.

func (*WriteAPIImpl) Flush

func (w *WriteAPIImpl) Flush()

Flush forces all pending writes from the buffer to be sent

func (*WriteAPIImpl) SetWriteFailedCallback added in v2.5.0

func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback)

SetWriteFailedCallback sets callback allowing custom handling of failed writes. If callback returns true, failed batch will be retried, otherwise discarded.

func (*WriteAPIImpl) WritePoint

func (w *WriteAPIImpl) WritePoint(point *write.Point)

WritePoint writes asynchronously Point into bucket. WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size. Blocking alternative is available in the WriteAPIBlocking interface

func (*WriteAPIImpl) WriteRecord

func (w *WriteAPIImpl) WriteRecord(line string)

WriteRecord writes asynchronously line protocol record into bucket. WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size. Blocking alternative is available in the WriteAPIBlocking interface

type WriteFailedCallback added in v2.5.0

type WriteFailedCallback func(batch string, error http2.Error, retryAttempts uint) bool

WriteFailedCallback is synchronously notified in case non-blocking write fails. batch contains complete payload, error holds detailed error information, retryAttempts means number of retries, 0 if it failed during first write. It must return true if WriteAPI should continue with retrying, false will discard the batch.

Directories

Path Synopsis
Package http provides HTTP servicing related code.
Package http provides HTTP servicing related code.
Package query defined types for representing flux query result
Package query defined types for representing flux query result
Package write provides the Point struct
Package write provides the Point struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL