godatabend

package module
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: Apache-2.0 Imports: 35 Imported by: 0

README

databend-go

Golang driver for databend cloud

Installation

go get github.com/databendcloud/databend-go

Key features

  • Supports native Databend HTTP client-server protocol
  • Compatibility with database/sql

Examples

Connecting

Connection can be achieved either via a DSN string with the format https://user:password@host/database?<query_option>=<value> and sql/Open method such as https://username:password@tenant--warehousename.ch.datafusecloud.com/test.

import (
  "database/sql"
  _ "github.com/databendcloud/databend-go"
)

func ConnectDSN() error {
    dsn, cfg, err := getDSN()
    if err != nil {
    log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
    }
    conn, err := sql.Open("databend", dsn)
    if err != nil {
        return err
    }
    return conn.Ping()
}

Connection Settings

If you are using the databend cloud you can get the connection settings using the following way.

  • host - the connect host such as tenant--warehousename.ch.datafusecloud.com that you can get from databend cloud as follows: image

  • username/password - auth credentials that you can get from databend cloud connect page as above

  • database - select the current default database

Execution

Once a connection has been obtained, users can issue sql statements for execution via the Exec method.

    dsn, cfg, err := getDSN()
	if err != nil {
		log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
	}
	conn, err := sql.Open("databend", dsn)
	if err != nil {
		fmt.Println(err)
	}
	conn.Exec(`DROP TABLE IF EXISTS data`)
	_, err = conn.Exec(`
    CREATE TABLE IF NOT EXISTS  data(
        Col1 TINYINT,
        Col2 VARCHAR 
    ) 
    `)
	if err != nil {
		fmt.Println(err)
	}
	_, err = conn.Exec("INSERT INTO data VALUES (1, 'test-1')")

Batch Insert

If the create table SQL is CREATE TABLE test ( i64 Int64, u64 UInt64, f64 Float64, s String, s2 String, a16 Array(Int16), a8 Array(UInt8), d Date, t DateTime) you can use the next code to batch insert data:

package main

import (
	"database/sql"
	"fmt"

	_ "github.com/databendcloud/databend-go"
)

func main() {
	conn, err := sql.Open("databend", "http://databend:databend@localhost:8000/default?sslmode=disable")
	tx, err := conn.Begin()
	if err != nil {
		fmt.Println(err)
	}
	batch, err := tx.Prepare(fmt.Sprintf("INSERT INTO %s VALUES", "test"))
	for i := 0; i < 10; i++ {
		_, err = batch.Exec(
			"1234",
			"2345",
			"3.1415",
			"test",
			"test2",
			"[4, 5, 6]",
			"[1, 2, 3]",
			"2021-01-01",
			"2021-01-01 00:00:00",
		)
	}
	err = tx.Commit()
}

Querying Row/s

Querying a single row can be achieved using the QueryRow method. This returns a *sql.Row, on which Scan can be invoked with pointers to variables into which the columns should be marshaled.

package main

import (
	"database/sql"
	"fmt"

	_ "github.com/databendcloud/databend-go"
)

func main() {
	// create table data (col1 uint8, col2 string);
	// insert into data values(1,'col2');
	conn, err := sql.Open("databend", "http://databend:databend@localhost:8000/default?sslmode=disable")
	if err != nil {
		fmt.Println(err)
	}
	row := conn.QueryRow("SELECT * FROM data")
	var (
		col1 uint8
		col2 string
	)
	if err := row.Scan(&col1, &col2); err != nil {
		fmt.Println(err)
	}
	fmt.Println(col2)
}

Iterating multiple rows requires the Query method. This returns a *sql.Rows struct on which Next can be invoked to iterate through the rows. QueryContext equivalent allows passing of a context.

package main

import (
	"database/sql"
	"fmt"

	_ "github.com/databendcloud/databend-go"
)

func main() {
	// create table data (col1 uint8, col2 string);
	// insert into data values(1,'col2');
	conn, err := sql.Open("databend", "http://databend:databend@localhost:8000/default?sslmode=disable")
	if err != nil {
		fmt.Println(err)
	}
	row, err := conn.Query("SELECT * FROM data")
	var (
		col1 uint8
		col2 string
	)
	for row.Next() {
		if err := row.Scan(&col1, &col2); err != nil {
			fmt.Println(err)
		}
		fmt.Println(col2)
	}
}

Compatibility

  • If databend version >= v0.9.0 or later, you need to use databend-go version >= v0.3.0.

Documentation

Index

Constants

View Source
const (
	DatabendTenantHeader    = "X-DATABEND-TENANT"
	DatabendWarehouseHeader = "X-DATABEND-WAREHOUSE"
	Authorization           = "Authorization"
	WarehouseRoute          = "X-DATABEND-ROUTE"
	UserAgent               = "User-Agent"
)
View Source
const DBSessionIDKey contextKey = "LOG_SESSION_ID"

DBSessionIDKey is context key of session id

View Source
const SFSessionUserKey contextKey = "LOG_USER"

SFSessionUserKey is context key of user id of a session

View Source
const (
	SSL_MODE_DISABLE = "disable"
)

Variables

View Source
var (
	ProvisionWarehouseTimeout = "ProvisionWarehouseTimeout"

	ErrDoRequest    = errors.New("DoReqeustFailed")
	ErrReadResponse = errors.New("ReadResponseFailed")
)
View Source
var (
	ErrPlaceholderCount = errors.New("databend: wrong placeholder count")
	ErrNoLastInsertID   = errors.New("no LastInsertId available")
	ErrNoRowsAffected   = errors.New("no RowsAffected available")
)
View Source
var LogKeys = [...]contextKey{DBSessionIDKey, SFSessionUserKey}

LogKeys these keys in context should be included in logging messages when using logger.WithContext

Functions

func Array

func Array(v interface{}) driver.Valuer

Array wraps slice or array into driver.Valuer interface to allow pass through it from database/sql

func DBCallerPrettyfier

func DBCallerPrettyfier(frame *runtime.Frame) (string, string)

DBCallerPrettyfier to provide base file name and function name from calling frame used in SFLogger

func Date

func Date(t time.Time) driver.Valuer

Date returns date for t

func Decimal128

func Decimal128(v interface{}, s int32) driver.Valuer

Decimal128 converts value to Decimal128 of precision S. The value can be a number or a string. The S (scale) parameter specifies the number of decimal places.

func Decimal32

func Decimal32(v interface{}, s int32) driver.Valuer

Decimal32 converts value to Decimal32 of precision S. The value can be a number or a string. The S (scale) parameter specifies the number of decimal places.

func Decimal64

func Decimal64(v interface{}, s int32) driver.Valuer

Decimal64 converts value to Decimal64 of precision S. The value can be a number or a string. The S (scale) parameter specifies the number of decimal places.

func DeregisterTLSConfig

func DeregisterTLSConfig(key string)

DeregisterTLSConfig removes the tls.Config associated with key.

func IP

func IP(i net.IP) driver.Valuer

IP returns compatible database format for net.IP

func IsAuthFailed added in v0.1.1

func IsAuthFailed(err error) bool

func IsNotFound added in v0.1.1

func IsNotFound(err error) bool

func IsProxyErr added in v0.1.1

func IsProxyErr(err error) bool

func Map

func Map(v interface{}) driver.Valuer

func NewAPIError added in v0.1.1

func NewAPIError(hint string, status int, respBuf []byte) error

func NewDefaultCSVFormatOptions added in v0.3.7

func NewDefaultCSVFormatOptions() map[string]string

func NewDefaultCopyOptions added in v0.3.7

func NewDefaultCopyOptions() map[string]string

func RegisterTLSConfig

func RegisterTLSConfig(key string, config *tls.Config) error

RegisterTLSConfig registers a custom tls.Config to be used with sql.Open.

func SetLogger

func SetLogger(inLogger *DBLogger)

SetLogger set a new logger of SFLogger interface for godatabend

func Tuple

func Tuple(v interface{}) driver.Valuer

Tuple converts a struct into a tuple struct{A string, B int}{"a", 1} -> ("a", 1)

func UInt64

func UInt64(u uint64) driver.Valuer

UInt64 returns uint64

Types

type APIClient

type APIClient struct {
	WaitTimeSeconds      int64
	MaxRowsInBuffer      int64
	MaxRowsPerPage       int64
	PresignedURLDisabled bool
	// contains filtered or unexported fields
}

func NewAPIClientFromConfig added in v0.3.2

func NewAPIClientFromConfig(cfg *Config) *APIClient

func (*APIClient) DoQuery

func (c *APIClient) DoQuery(ctx context.Context, query string, args []driver.Value) (*QueryResponse, error)

func (*APIClient) GetPresignedURL added in v0.3.12

func (c *APIClient) GetPresignedURL(ctx context.Context, stage *StageLocation) (*PresignedResponse, error)

func (*APIClient) InsertWithStage added in v0.3.6

func (c *APIClient) InsertWithStage(ctx context.Context, sql string, stage *StageLocation, fileFormatOptions, copyOptions map[string]string) (*QueryResponse, error)

func (*APIClient) KillQuery added in v0.4.4

func (c *APIClient) KillQuery(ctx context.Context, killURI string) error

func (*APIClient) QueryPage

func (c *APIClient) QueryPage(ctx context.Context, nextURI string) (*QueryResponse, error)

func (*APIClient) QuerySingle added in v0.3.6

func (c *APIClient) QuerySingle(ctx context.Context, query string, args []driver.Value) (*QueryResponse, error)

func (*APIClient) QuerySync

func (c *APIClient) QuerySync(ctx context.Context, query string, args []driver.Value, respCh chan QueryResponse) error

func (*APIClient) UploadToStage added in v0.3.6

func (c *APIClient) UploadToStage(ctx context.Context, stage *StageLocation, input *bufio.Reader, size int64) error

func (*APIClient) UploadToStageByAPI added in v0.3.12

func (c *APIClient) UploadToStageByAPI(ctx context.Context, stage *StageLocation, input *bufio.Reader, size int64) error

func (*APIClient) UploadToStageByPresignURL added in v0.0.4

func (c *APIClient) UploadToStageByPresignURL(ctx context.Context, stage *StageLocation, input *bufio.Reader, size int64) error

func (*APIClient) WaitForQuery added in v0.3.12

func (c *APIClient) WaitForQuery(ctx context.Context, result *QueryResponse) (*QueryResponse, error)

type APIError added in v0.1.1

type APIError struct {
	RespBody   APIErrorResponseBody
	RespText   string
	StatusCode int
	Hint       string
}

func (APIError) Error added in v0.1.1

func (e APIError) Error() string

type APIErrorResponseBody added in v0.1.1

type APIErrorResponseBody struct {
	Error   string `json:"error"`
	Message string `json:"message"`
}

func RespBody added in v0.1.1

func RespBody(err error) APIErrorResponseBody

type AccessTokenLoader added in v0.3.4

type AccessTokenLoader interface {
	// LoadAccessToken is called whenever a new request is made to the server.
	LoadAccessToken(ctx context.Context, forceRotate bool) (string, error)
}

AccessTokenLoader is used on Bearer authentication. The token may have a limited lifetime, you can rotate your token by this interface.

type AuthMethod added in v0.3.4

type AuthMethod string
const (
	AuthMethodUserPassword AuthMethod = "userPassword"
	AuthMethodAccessToken  AuthMethod = "accessToken"
)

type Config

type Config struct {
	Tenant    string // Tenant
	Warehouse string // Warehouse
	User      string // Username
	Password  string // Password (requires User)
	Database  string // Database name

	Role string // Role is the databend role you want to use for the current connection

	AccessToken       string
	AccessTokenFile   string // path to file containing access token, it can be used to rotate access token
	AccessTokenLoader AccessTokenLoader

	Host    string
	Timeout time.Duration
	/* Pagination params: WaitTimeSecs,  MaxRowsInBuffer, MaxRowsPerPage
	Pagination: critical conditions for each HTTP request to return (before all remaining result is ready to return)
	Related docs:https://databend.rs/doc/integrations/api/rest#query-request
	*/
	WaitTimeSecs    int64
	MaxRowsInBuffer int64
	MaxRowsPerPage  int64
	Location        *time.Location
	Debug           bool
	GzipCompression bool
	Params          map[string]string
	TLSConfig       string
	SSLMode         string

	// track the progress of query execution
	StatsTracker QueryStatsTracker

	// used on the storage which does not support presigned url like HDFS, local fs
	PresignedURLDisabled bool
}

Config is a set of configuration parameters

func NewConfig

func NewConfig() *Config

NewConfig creates a new config with default values

func ParseDSN

func ParseDSN(dsn string) (*Config, error)

ParseDSN parses the DSN string to a Config

func (*Config) AddParams added in v0.1.7

func (cfg *Config) AddParams(params map[string]string) (err error)

func (*Config) FormatDSN

func (cfg *Config) FormatDSN() string

FormatDSN formats the given Config into a DSN string which can be passed to the driver.

type DBLogger

type DBLogger interface {
	rlog.Ext1FieldLogger
	SetLogLevel(level string) error
	WithContext(ctx context.Context) *rlog.Entry
	SetOutput(output io.Writer)
}

DBLogger Databend logger interface to expose FieldLogger defined in logrus

func CreateDefaultLogger

func CreateDefaultLogger() DBLogger

CreateDefaultLogger return a new instance of SFLogger with default config

func GetLogger

func GetLogger() DBLogger

GetLogger return logger that is not public

type DataField

type DataField struct {
	Name string `json:"name"`
	Type string `json:"type"`
}

type DataParser

type DataParser interface {
	Parse(io.RuneScanner) (driver.Value, error)
	Type() reflect.Type
}

DataParser implements parsing of a driver value and reporting its type.

func NewDataParser

func NewDataParser(t *TypeDesc, opt *DataParserOptions) (DataParser, error)

NewDataParser creates a new DataParser based on the given TypeDesc.

type DataParserOptions

type DataParserOptions struct {
	// Location describes default location for DateTime and Date field without Timezone argument.
	Location *time.Location
	// UseDBLocation if false: always use Location, ignore DateTime argument.
	UseDBLocation bool
}

DataParserOptions describes DataParser options. Ex.: Fields Location and UseDBLocation specify timezone options.

type DatabendConn

type DatabendConn struct {
	// contains filtered or unexported fields
}

func (*DatabendConn) Begin

func (dc *DatabendConn) Begin() (driver.Tx, error)

func (*DatabendConn) Close

func (dc *DatabendConn) Close() error

Close invalidates and potentially stops any current prepared statements and transactions, marking this connection as no longer in use.

func (*DatabendConn) Commit

func (dc *DatabendConn) Commit() (err error)

Commit applies prepared statement if it exists

func (*DatabendConn) Exec

func (dc *DatabendConn) Exec(query string, args []driver.Value) (driver.Result, error)

func (*DatabendConn) ExecContext added in v0.0.9

func (dc *DatabendConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error)

func (*DatabendConn) Prepare

func (dc *DatabendConn) Prepare(query string) (driver.Stmt, error)

func (*DatabendConn) PrepareContext

func (dc *DatabendConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error)

func (*DatabendConn) Query

func (dc *DatabendConn) Query(query string, args []driver.Value) (driver.Rows, error)

func (*DatabendConn) QueryContext added in v0.0.9

func (dc *DatabendConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error)

func (*DatabendConn) Rollback

func (dc *DatabendConn) Rollback() error

Rollback cleans prepared statement

type DatabendDriver

type DatabendDriver struct {
	// contains filtered or unexported fields
}

DatabendDriver is a context of Go Driver

func (DatabendDriver) Open

func (d DatabendDriver) Open(dsn string) (driver.Conn, error)

Open creates a new connection.

func (DatabendDriver) OpenWithConfig

func (d DatabendDriver) OpenWithConfig(
	ctx context.Context,
	config Config) (
	driver.Conn, error)

OpenWithConfig creates a new connection with the given Config.

type Error

type Error struct {
	Code    int
	Message string
}

Error contains parsed information about server error

func (*Error) Error

func (e *Error) Error() string

Error implements the interface error

type FileAccessTokenData added in v0.3.4

type FileAccessTokenData struct {
	AccessToken string `toml:"access_token"`
}

type FileAccessTokenLoader added in v0.3.4

type FileAccessTokenLoader struct {
	// contains filtered or unexported fields
}

func NewFileAccessTokenLoader added in v0.3.4

func NewFileAccessTokenLoader(path string) *FileAccessTokenLoader

func (*FileAccessTokenLoader) LoadAccessToken added in v0.3.4

func (l *FileAccessTokenLoader) LoadAccessToken(ctx context.Context, forceRotate bool) (string, error)

try decode as toml, if not toml, return the plain key content

type PaginationConfig added in v0.3.4

type PaginationConfig struct {
	WaitTime        int64 `json:"wait_time_secs,omitempty"`
	MaxRowsInBuffer int64 `json:"max_rows_in_buffer,omitempty"`
	MaxRowsPerPage  int64 `json:"max_rows_per_page,omitempty"`
}

type PresignedResponse added in v0.3.6

type PresignedResponse struct {
	Method  string
	Headers map[string]string
	URL     string
}

type QueryError

type QueryError struct {
	Code    int    `json:"code"`
	Message string `json:"message"`
	Kind    string `json:"kind"`
}

func (*QueryError) Error added in v0.1.0

func (e *QueryError) Error() string

type QueryProgress

type QueryProgress struct {
	Bytes uint64 `json:"bytes"`
	Rows  uint64 `json:"rows"`
}

type QueryRequest

type QueryRequest struct {
	Session    *SessionConfig    `json:"session,omitempty"`
	SQL        string            `json:"sql"`
	Pagination *PaginationConfig `json:"pagination,omitempty"`

	StageAttachment *StageAttachmentConfig `json:"stage_attachment,omitempty"`
}

type QueryResponse

type QueryResponse struct {
	ID        string         `json:"id"`
	SessionID string         `json:"session_id"`
	Session   *SessionConfig `json:"session"`
	Schema    []DataField    `json:"schema"`
	Data      [][]string     `json:"data"`
	State     string         `json:"state"`
	Error     *QueryError    `json:"error"`
	Stats     QueryStats     `json:"stats"`
	// TODO: Affect rows
	StatsURI string `json:"stats_uri"`
	FinalURI string `json:"final_uri"`
	NextURI  string `json:"next_uri"`
	KillURI  string `json:"kill_uri"`
}

type QueryStats

type QueryStats struct {
	RunningTimeMS  float64       `json:"running_time_ms"`
	ScanProgress   QueryProgress `json:"scan_progress"`
	WriteProgress  QueryProgress `json:"write_progress"`
	ResultProgress QueryProgress `json:"result_progress"`
}

type QueryStatsTracker added in v0.4.4

type QueryStatsTracker func(queryID string, stats *QueryStats)

QueryStatsTracker is a function that will be called when query stats are updated, it can be specified in the Config struct.

type SessionConfig added in v0.3.4

type SessionConfig struct {
	Database string `json:"database,omitempty"`
	Role     string `json:"role,omitempty"`

	Settings map[string]string `json:"settings,omitempty"`
}

type StageAttachmentConfig added in v0.3.6

type StageAttachmentConfig struct {
	Location          string            `json:"location"`
	FileFormatOptions map[string]string `json:"file_format_options,omitempty"`
	CopyOptions       map[string]string `json:"copy_options,omitempty"`
}

type StageLocation added in v0.3.7

type StageLocation struct {
	Name string
	Path string
}

func (*StageLocation) String added in v0.3.7

func (sl *StageLocation) String() string

type StaticAccessTokenLoader added in v0.3.4

type StaticAccessTokenLoader struct {
	AccessToken string
}

func NewStaticAccessTokenLoader added in v0.3.4

func NewStaticAccessTokenLoader(accessToken string) *StaticAccessTokenLoader

func (*StaticAccessTokenLoader) LoadAccessToken added in v0.3.4

func (l *StaticAccessTokenLoader) LoadAccessToken(ctx context.Context, forceRotate bool) (string, error)

type TypeDesc

type TypeDesc struct {
	Name string
	Args []*TypeDesc
}

TypeDesc describes a (possibly nested) data type returned by Databend.

func ParseTypeDesc

func ParseTypeDesc(s string) (*TypeDesc, error)

ParseTypeDesc parses the type description that Databend provides.

The grammar is quite simple:

desc
    name
    name()
    name(args)
args
    desc
    desc, args

Examples:

String
Nullable(Nothing)
Array(Tuple(Tuple(String, String), Tuple(String, UInt64)))

Directories

Path Synopsis
lib

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL