greptime

package module
v0.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: Apache-2.0 Imports: 18 Imported by: 3

README

Build Status codecov Go Reference

GreptimeDB Go Ingester

Provide API to insert data into GreptimeDB.

How To Use

Installation
go get -u github.com/GreptimeTeam/greptimedb-ingester-go
Import
import greptime "github.com/GreptimeTeam/greptimedb-ingester-go"
Config

Initiate a Config for Client

cfg := greptime.NewConfig("<host>").
    WithPort(4001).
    WithAuth("<username>", "<password>").
    WithDatabase("<database>")
Options
Secure
cfg.WithInsecure(false) // default insecure=true
keepalive
cfg.WithKeepalive(time.Second*30, time.Second*5) // keepalive isn't enabled by default
Multiple endpoints

Configure several GreptimeDB endpoints to spread writes across an HA cluster. Unary calls (Write, Delete, HealthCheck, BulkWrite) pick an endpoint per call through the configured picker. A streaming session started by StreamWrite / StreamDelete binds to one endpoint until CloseStream; if that endpoint fails mid-stream the Send returns the error and the next StreamWrite picks a fresh endpoint. Auth, TLS, keepalive and telemetry are shared across all endpoints.

import "github.com/GreptimeTeam/greptimedb-ingester-go/loadbalancer"

cfg := greptime.NewConfig().
    WithDatabase("<database>").
    WithEndpoints("host1:4001", "host2:4001", "host3:4001").
    WithLoadBalancer(loadbalancer.NewRoundRobin()) // default is NewRandom()

See examples/multi_endpoint for a runnable demo that reports per-endpoint dispatch counts.

Client
c, err := greptime.NewClient(cfg)
...
defer c.client.Close()
Insert & StreamInsert
  • you can Insert data into GreptimeDB via different style:

  • streaming insert is to Send data into GreptimeDB without waiting for response.

Table style

you can define schema via Table and Column, and then AddRow to include the real data you want to write.

define table schema, and add rows
import(
    "github.com/GreptimeTeam/greptimedb-ingester-go/table"
    "github.com/GreptimeTeam/greptimedb-ingester-go/table/types"
)

tbl, err := table.New("<table_name>")

tbl.AddTagColumn("id", types.INT64)
tbl.AddFieldColumn("host", types.STRING)
tbl.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)

err := tbl.AddRow(1, "127.0.0.1", time.Now())
err := tbl.AddRow(2, "127.0.0.2", time.Now())
...
Write into GreptimeDB
resp, err := c.Write(context.Background(), tbl)
Delete from GreptimeDB
dtbl, err := table.New("<table_name>")
dtbl.AddTagColumn("id", types.INT64)
dtbl.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)

// timestamp is the time you want to delete row
err := dtbl.AddRow(1, "127.0.0.1",timestamp)

affected, err := c.Delete(context.Background(),dtbl)
Stream Write into GreptimeDB
err := c.StreamWrite(context.Background(), tbl)
...
affected, err := c.CloseStream(ctx)
Stream Delete from GreptimeDB
err := c.StreamDelete(context.Background(), tbl)
...
affected, err := c.CloseStream(ctx)
ORM style

If you prefer ORM style, and define column-field relationship via struct field tag, you can try the following way.

Tag
  • greptime is the struct tag key
  • tag, field, timestamp is for SemanticType, and the value is ignored
  • column is to define the column name
  • type is to define the data type. if type is timestamp, precision is supported
  • the metadata separator is ; and the key value separator is :

type supported is the same as described Datatypes supported, and case insensitive.

When fields marked with greptime:"-", writing field will be ignored.

define struct with tags
type Monitor struct {
    ID          int64     `greptime:"tag;column:id;type:int64"`
    Host        string    `greptime:"field;column:host;type:string"`
    Ts          time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
}

// TableName is to define the table name.
func (Monitor) TableName() string {
    return "<table_name>"
}
instance your struct
monitors := []Monitor{
    {
        ID:          randomId(),
        Host:        "127.0.0.1",
        Running:     true,
    },
    {
        ID:          randomId(),
        Host:        "127.0.0.2",
        Running:     true,
    },
}
WriteObject into GreptimeDB
resp, err := c.WriteObject(context.Background(), monitors)
DeleteObject in GreptimeDB
deleteMonitors := monitors[:1]

affected, err := c.DeleteObject(context.Background(), deleteMonitors)
Stream WriteObject into GreptimeDB
err := c.StreamWriteObject(context.Background(), monitors)
...
affected, err := c.CloseStream(ctx)
Stream DeleteObject in GreptimeDB
deleteMonitors := monitors[:1]

err := c.StreamDeleteObject(context.Background(), deleteMonitors)
...
affected, err := c.CloseStream(ctx)

Datatypes supported

The GreptimeDB column is for the datatypes supported in library, and the Go column is the matched Go type.

GreptimeDB Go Description
INT8 int8 -128 ~ 127
INT16 int16 -32768 ~ 32767
INT32 int32 -2147483648 ~ 2147483647
INT64, INT int64 -9223372036854775808 ~ 9223372036854775807
UINT8 uint8 0 ~ 255
UINT16 uint16 0 ~ 65535
UINT32 uint32 0 ~ 4294967295
UINT64, UINT uint64 0 ~ 18446744073709551615
FLOAT32 float32 32-bit IEEE754 floating point values
FLOAT64, FLOAT float64 Double precision IEEE 754 floating point values
BOOLEAN, BOOL bool TRUE or FALSE bool values
STRING string UTF-8 encoded strings. Holds up to 2,147,483,647 bytes of data
BINARY, BYTES []byte Variable-length binary values. Holds up to 2,147,483,647 bytes of data
DATE Int or time.Time 32-bit date values represent the days since UNIX Epoch
DATETIME Int or time.Time 64-bit timestamp values with microseconds precision, equivalent to TimestampMicrosecond
TIMESTAMP_SECOND Int or time.Time 64-bit timestamp values with seconds precision, range: [-262144-01-01 00:00:00, +262143-12-31 23:59:59]
TIMESTAMP_MILLISECOND, TIMESTAMP Int or time.Time 64-bit timestamp values with milliseconds precision, range: [-262144-01-01 00:00:00.000, +262143-12-31 23:59:59.999]
TIMESTAMP_MICROSECOND Int or time.Time 64-bit timestamp values with microseconds precision, range: [-262144-01-01 00:00:00.000000, +262143-12-31 23:59:59.999999]
TIMESTAMP_NANOSECOND Int or time.Time 64-bit timestamp values with nanoseconds precision, range: [1677-09-21 00:12:43.145225, 2262-04-11 23:47:16.854775807]
JSON string JSON data

NOTE: Int is for all of Integer and Unsigned Integer in Go

Query

You can use ORM library like gorm with MySQL or PostgreSQL driver to connect GreptimeDB and retrieve data from it.

type Monitor struct {
    ID          int64     `gorm:"primaryKey;column:id"`
    Host        string    `gorm:"column:host"`
    Ts          time.Time `gorm:"column:ts"`
}

// Get all monitors
var monitors []Monitor
result := db.Find(&monitors)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client added in v0.2.0

type Client struct {
	// contains filtered or unexported fields
}

Client helps to write data into GreptimeDB. A Client is safe for concurrent use by multiple goroutines; one instance per application is typical.

When configured with multiple endpoints via Config.WithEndpoints, unary calls (Write, Delete, HealthCheck, BulkWrite) pick an endpoint per call through the configured loadbalancer.Picker. A streaming session opened by StreamWrite/StreamDelete binds to a single endpoint until CloseStream; if that endpoint fails mid-stream, the Send returns the underlying error and the next StreamWrite call picks a fresh endpoint to open a new stream.

func NewClient added in v0.2.0

func NewClient(cfg *Config) (*Client, error)

NewClient creates the greptimedb client responsible for writing data into GreptimeDB. Endpoints configured via WithEndpoints take precedence over Host:Port; each entry must be a "host:port" string.

func (*Client) BulkWrite added in v0.7.0

func (c *Client) BulkWrite(ctx context.Context, table *table.Table) (*gpb.GreptimeResponse, error)

BulkWrite performs a high-efficiency bulk data write operation to GreptimeDB using Apache Arrow format. It sends the entire table data in a single batch, which is more efficient for large datasets compared to row-by-row writes. The table must have columns and rows properly defined before calling this method.

func (*Client) Close added in v0.6.2

func (c *Client) Close() error

Close terminates all underlying gRPC connections. Any active stream is aborted by the connection teardown; callers that need a graceful half-close must call CloseStream first. Call this method when the client is no longer needed. Close is idempotent; RPCs issued after Close fail with a gRPC transport error rather than panicking.

func (*Client) CloseStream added in v0.2.0

func (c *Client) CloseStream(_ context.Context) (*gpb.AffectedRows, error)

CloseStream closes the stream. Once we’ve finished writing our client’s requests to the stream using client.StreamWrite or client.StreamWriteObject, we need to call client.CloseStream to let GreptimeDB know that we’ve finished writing and are expecting to receive a response.

func (*Client) Delete added in v0.5.0

func (c *Client) Delete(ctx context.Context, tables ...*table.Table) (*gpb.GreptimeResponse, error)

Delete is to delete the data from GreptimeDB via explicit schema.

tbl, err := table.New(<tableName>)

// add column at first. This is to define the schema of the table.
tbl.AddTagColumn("tag1", types.INT64)
tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)

// you can add multiple row(s). This is the real data.
tbl.AddRow("tag1", timestamp)

// delete the data from GreptimeDB
resp, err := client.Delete(context.Background() tbl)

func (*Client) DeleteObject added in v0.5.0

func (c *Client) DeleteObject(ctx context.Context, obj any) (*gpb.GreptimeResponse, error)

DeleteObject is like [Delete] to delete the data from GreptimeDB, but schema is defined in the struct tag. resp, err := client.DeleteObject(context.Background(), deleteMonitors)

func (*Client) HealthCheck added in v0.5.2

func (c *Client) HealthCheck(ctx context.Context) (*gpb.HealthCheckResponse, error)

HealthCheck will check GreptimeDB health status. With multiple endpoints configured, HealthCheck probes whichever endpoint the picker returns for this call, not every endpoint.

func (*Client) StreamDelete added in v0.5.0

func (c *Client) StreamDelete(ctx context.Context, tables ...*table.Table) error

StreamDelete is to delete the data from GreptimeDB via explicit schema.

tbl, err := table.New(<tableName>)

// add column at first. This is to define the schema of the table.
tbl.AddTagColumn("tag1", types.INT64)
tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)

// you can add multiple row(s). This is the real data.
tbl.AddRow("tag1", timestamp)

// delete the data from GreptimeDB
resp, err := client.StreamDelete(context.Background(), tbl)

func (*Client) StreamDeleteObject added in v0.5.0

func (c *Client) StreamDeleteObject(ctx context.Context, body any) error

StreamDeleteObject is like [StreamDelete] to Delete the data from GreptimeDB, but schema is defined in the struct tag. resp, err := client.StreamDeleteObject(context.Background(), deleteMonitors)

func (*Client) StreamWrite added in v0.2.0

func (c *Client) StreamWrite(ctx context.Context, tables ...*table.Table) error

StreamWrite is to send the data into GreptimeDB via explicit schema.

tbl, err := table.New(<tableName>)

// add column at first. This is to define the schema of the table.
tbl.AddTagColumn("tag1", types.INT64)
tbl.AddFieldColumn("field1", types.STRING)
tbl.AddFieldColumn("field2", types.FLOAT64)
tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)
timestamp = time.Now()
// you can add multiple row(s). This is the real data.
tbl.AddRow(1, "hello", 1.1, timestamp)

// send data into GreptimeDB
resp, err := client.StreamWrite(context.Background(), tbl)

func (*Client) StreamWriteObject added in v0.3.0

func (c *Client) StreamWriteObject(ctx context.Context, body any) error

StreamWriteObject is like [StreamWrite] to send the data into GreptimeDB, but schema is defined in the struct tag.

type monitor struct {
  ID          int64     `greptime:"tag;column:id;type:int64"`
  Host        string    `greptime:"tag;column:host;type:string"`
  Memory      uint64    `greptime:"field;column:memory;type:uint64"`
  Cpu         float64   `greptime:"field;column:cpu;type:float64"`
  Temperature int64     `greptime:"field;column:temperature;type:int64"`
  Running     bool      `greptime:"field;column:running;type:boolean"`
  Ts          time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
}

func (monitor) TableName() string {
  return monitorTableName
}

monitors := []monitor{
	{
	    ID:          randomId(),
	    Host:        "127.0.0.1",
	    Memory:      1,
	    Cpu:         1.0,
	    Temperature: -1,
	    Ts:          time1,
	    Running:     true,
	},
	{
	    ID:          randomId(),
	    Host:        "127.0.0.2",
	    Memory:      2,
	    Cpu:         2.0,
	    Temperature: -2,
	    Ts:          time2,
	    Running:     true,
	},
}

resp, err := client.StreamWriteObject(context.Background(), monitors)

func (*Client) Write added in v0.2.0

func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*gpb.GreptimeResponse, error)

Write is to write the data into GreptimeDB via explicit schema.

tbl, err := table.New(<tableName>)

// add column at first. This is to define the schema of the table.
tbl.AddTagColumn("tag1", types.INT64)
tbl.AddFieldColumn("field1", types.STRING)
tbl.AddFieldColumn("field2", types.FLOAT64)
tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND)
timestamp := time.Now()
// you can add multiple row(s). This is the real data.
tbl.AddRow(1, "hello", 1.1, timestamp)

// write data into GreptimeDB
resp, err := client.Write(context.Background(), tbl)

func (*Client) WriteObject added in v0.3.0

func (c *Client) WriteObject(ctx context.Context, obj any) (*gpb.GreptimeResponse, error)

WriteObject is like [Write] to write the data into GreptimeDB, but schema is defined in the struct tag.

type Monitor struct {
  ID          int64     `greptime:"tag;column:id;type:int64"`
  Host        string    `greptime:"tag;column:host;type:string"`
  Memory      uint64    `greptime:"field;column:memory;type:uint64"`
  Cpu         float64   `greptime:"field;column:cpu;type:float64"`
  Temperature int64     `greptime:"field;column:temperature;type:int64"`
  Running     bool      `greptime:"field;column:running;type:boolean"`
  Ts          time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
}

func (Monitor) TableName() string {
  return monitorTableName
}

monitors := []Monitor{
	{
	    ID:          randomId(),
	    Host:        "127.0.0.1",
	    Memory:      1,
	    Cpu:         1.0,
	    Temperature: -1,
	    Ts:          time1,
	    Running:     true,
	},
	{
	    ID:          randomId(),
	    Host:        "127.0.0.2",
	    Memory:      2,
	    Cpu:         2.0,
	    Temperature: -2,
	    Ts:          time2,
	    Running:     true,
	},
}

resp, err := client.WriteObject(context.Background(), monitors)

type Config added in v0.2.0

type Config struct {
	Host     string // no scheme or port included. example: 127.0.0.1
	Port     int    // default: 4001
	Username string
	Password string
	Database string // the default database
	// contains filtered or unexported fields
}

Config is to define how the Client behaves.

  • Host is 127.0.0.1 in local environment.
  • Port default value is 4001.
  • Username and Password can be left to empty in local environment. you can find them in GreptimeCloud service detail page.
  • Database is the default database the client will operate on. But you can change the database in InsertRequest or QueryRequest.

func NewConfig added in v0.2.0

func NewConfig(hosts ...string) *Config

NewConfig initializes a Config.

  • NewConfig() — no endpoint yet; caller must call WithEndpoints.
  • NewConfig("host") — single-endpoint legacy form; port defaults to 4001 and can be changed with WithPort.
  • NewConfig("host:port") — single-endpoint shorthand equivalent to NewConfig().WithEndpoints("host:port"). WithPort has no effect.
  • NewConfig("host1:4001", "host2:4001", ...) — equivalent to NewConfig().WithEndpoints(args...). WithPort has no effect.

func (*Config) WithAuth added in v0.2.0

func (c *Config) WithAuth(username, password string) *Config

WithAuth helps to specify the Basic Auth username and password. Leave them empty if you are in local environment.

func (*Config) WithDatabase added in v0.2.0

func (c *Config) WithDatabase(database string) *Config

WithDatabase helps to specify the default database the client operates on.

func (*Config) WithDialOption added in v0.4.0

func (c *Config) WithDialOption(opt grpc.DialOption) *Config

WithDialOption helps to specify the dial option which has not been supported by ingester sdk yet.

func (*Config) WithEndpoints added in v0.7.2

func (c *Config) WithEndpoints(endpoints ...string) *Config

WithEndpoints configures multiple GreptimeDB endpoints for client-side load balancing. Each endpoint must be a "host:port" string. When at least one endpoint is provided, Host and Port are ignored. Passing no arguments is a no-op and the client falls back to Host:Port.

Authentication, TLS, keepalive, telemetry and any dial options are shared across all endpoints; per-endpoint overrides are not supported.

func (*Config) WithInsecure added in v0.4.0

func (c *Config) WithInsecure(insecure bool) *Config

TODO(yuanbohan): support more tls options

func (*Config) WithKeepalive added in v0.2.0

func (c *Config) WithKeepalive(time, timeout time.Duration) *Config

WithKeepalive helps to set the keepalive option.

  • time. After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. If set below 10s, a minimum value of 10s will be used instead.
  • timeout. After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that the connection is closed.

func (*Config) WithLoadBalancer added in v0.7.2

func (c *Config) WithLoadBalancer(picker loadbalancer.Picker) *Config

WithLoadBalancer sets the load-balancing strategy used when more than one endpoint is configured. Defaults to loadbalancer.NewRandom() when unset. Has no observable effect when only a single endpoint is in use.

func (*Config) WithMeterProvider added in v0.5.3

func (c *Config) WithMeterProvider(p metric.MeterProvider) *Config

WithMeterProvider provides a MeterProvider for SDK. If metrics collection is not enabled, then this option has no effect. If metrics collection is enabled and this option is not provide. the global MeterProvider will be used.

func (*Config) WithMetricsEnabled added in v0.5.3

func (c *Config) WithMetricsEnabled(b bool) *Config

WithMetricsEnabled enables/disables collection of SDK's metrics. Disabled by default.

func (*Config) WithPort added in v0.2.0

func (c *Config) WithPort(port int) *Config

WithPort set the Port field. Do not change it if you have no idea what it is.

func (*Config) WithTraceProvider added in v0.5.3

func (c *Config) WithTraceProvider(p trace.TracerProvider) *Config

WithTraceProvider provides a TracerProvider for SDK. If traces collection is not enabled, then this option has no effect. If traces collection is enabled and this option is not provide. the global MeterProvider will be used.

func (*Config) WithTracesEnabled added in v0.5.3

func (c *Config) WithTracesEnabled(b bool) *Config

WithTracesEnabled enables/disables collection of SDK's traces. Disabled by default.

Directories

Path Synopsis
examples
bulkwrite command
healthcheck command
hints command
jsondata command
multi_endpoint command
Writes to several GreptimeDB endpoints with client-side load balancing.
Writes to several GreptimeDB endpoints with client-side load balancing.
object command
table command
internal
pool
Package pool manages a set of gRPC connections to GreptimeDB endpoints and dispatches calls to one of them through a pluggable loadbalancer.Picker.
Package pool manages a set of gRPC connections to GreptimeDB endpoints and dispatches calls to one of them through a pluggable loadbalancer.Picker.
Package loadbalancer provides pluggable strategies for distributing requests across multiple GreptimeDB endpoints.
Package loadbalancer provides pluggable strategies for distributing requests across multiple GreptimeDB endpoints.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL