ydbgoquery

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

README

CI Go Report Card Coverage Go version License

ydb-go-query

Experimental lightweight YDB client that focuses on query service. Work in progress (API might change).

Features

  • Query execution
  • Transactions
  • Location aware load balancing with continuous 'out-of-band' nodes discovery
  • Session pool with session recycling and auto warm-up
  • Authentication: user-pass and Yandex Cloud IAM (for serverless YDB).
  • Works with and exposes bare YDB GRPC field types github.com/ydb-platform/ydb-go-genproto/protos/Ydb (but provides type helpers for convenience).
  • Ready status with high and low thresholds.

TODO

  • Ready status
  • DC/location priorities for balancer
  • Migrations
  • Scripts
  • More type helpers
  • Retries
  • OpenMetrics

Quickstart

package main

import (
	"context"
	"fmt"
	ydb "github.com/adwski/ydb-go-query"
)

func main() {
    ctx := context.Background()

    // connect without authentication and TSL
    client, err := ydb.Open(ctx, ydb.Config{
        InitialNodes: []string{"127.0.0.1:2136"}, // endpoints used for discovery
        DB:           "/local",                  // database path
    })
    if err != nil {
        panic(err)
    }

    defer client.Close()

    // exec query outside of transaction
    // execution blocks until connection (and session) is available
    result, err := client.QueryCtx().Exec(ctx, `SELECT 1`)

    switch {
    case err != nil:
        fmt.Printf("YDB error: %v\n", err)

    case result.Err() != nil:
        fmt.Printf("Query error: %v\nIssues: \n%v\n", result.Err(), result.Issues())

    default:
        fmt.Printf("Ok!\nstats: %v\ncols: %v\n", result.Stats(), result.Cols())
        for rIdx, row := range result.Rows() {
            fmt.Printf("row %d: %v\n", rIdx, row)
        }
    }
}

More config options

// init logging with external logger.
// "debug" here is an internal level (not related to zerolog)
ydb.WithZeroLogger(zerolog.New(zerolog.NewConsoleWriter()), "debug"),

// zap is also available
// WithZapLogger(zapLogger, "debug"),

// Query execution timeout, default 5 minutes. 
ydb.WithQueryTimeout(5*time.Minute),

// Session pool size, default is 10.
ydb.WithSessionPoolSize(50),

// Session pool ready Low and High thresholds.
// If amount of ready sessions hit above High threshold,
// then client.Ready() returns true.
// If amount of ready sessions hit below Low threshold,
// then client.Ready() returns false.
// Default is low=0, high=50
ydb.WithSessionPoolReadyThresholds(10, 80)

// Session create timeout, default is 3 seconds.
ydb.WithSessionCreateTimeout(5*time.Second)

// Amount of connections to create per each discovered endpoint.
// Default is 2
ydb.WithConnectionsPerEndpoint(4)

// Location preference.
// Client will use connections in first location from this list,
// if all connections in this location are not alive, it will move to next.
// If there's no alive connections in ether of these locations,
// alive connections from other discovered locations (if any) will be used.
ydb.WithLocationPreference([]string{"ru-central1-b", "ru-central1-a"})

// tx mode, serializable rw is default
ydb.WithSerializableReadWrite()

Contexts

Context provided with ydb.Open(ctx, ...) acts as the parent context for every running component of the client. If it is canceled, client shuts down. It is fine to cancel this context if you want to terminate your app but make sure to call ydb.Close() to wait for cleanup completion. It is also acceptable to just call ydb.Close() without canceling this context.

Context provided in Exec(ctx) is the query context, used internally for timeouts and grpc calls. You can also cancel this context to abort query execution.

Use with serverless YDB in Yandex Cloud

client, err := ydb.Open(ctx,
    ydb.Config{
        InitialNodes: []string{"ydb.serverless.yandexcloud.net:2135"},
        DB:           "/ru-central1/b1g22ge123t0me6ngsfg/etn4nlihce23r24fgnk32p",
    },
    ydb.WithTransportTLS(),     // use TLS
    ydb.WithYCAuthFile("/path/to/iam/key.json"), // use YC authorization
)

Exec simple queries

client.QueryCtx() returns query execution context which holds global configuration that all queries use. At the moment it controls transaction mode and query timeout.

client, _ := ydb.Open(ctx, ydb.Config{
    InitialNodes: []string{"127.0.0.1:2136"},
    DB:           "/local",
})

qCtx := client.QueryCtx() // get query execution context

// DDL and DML queries must be executed outside of transaction
res, err := qCtx.Exec(`CREATE TABLE users (
    user_id Uint64,
    first_name Utf8,
    last_name Utf8,
    email Utf8,
    registered_ts Uint64,
    PRIMARY KEY (user_id))`)

if err != nil {
    panic(err)
}

qCtx.Query() executes each query in single transaction. Transaction mode is derived from query.Ctx.

res, err = qCtx.Query(`DECLARE $user_id AS Uint64;
    DECLARE $first_name AS Utf8;
    DECLARE $last_name AS Utf8;
    DECLARE $email AS Utf8;
    DECLARE $registered_ts AS Uint64;
    UPSERT INTO users (
        user_id, first_name, last_name, email, registered_ts
    ) VALUES (
        $user_id, $first_name, $last_name, $email, $registered_ts)`).
    Param("$user_id", types.Uint64(123)).
    Param("$first_name", types.UTF8("test")).
    Param("$last_name", types.UTF8("test")).
    Param("$email", types.UTF8("test@test.test")).
    Param("$registered_ts", types.Uint64(1726836887)).
    Exec(ctx)

qCtx.Query() is used for select queries as well.

res, err := qCtx.Query("SELECT * FROM users").Exec(ctx)
if err != nil {
	panic(err) // io error
}
if res.Err() != nil {
	panic(res.Err()) // query error
}
for _, row := range result.Rows() {
    fmt.Printf("row: %v\n", row)
}

You can gather result rows with custom function provided with Collect(). This func will be called every time result part is arrived. result.Rows() will be empty in this case.

res, err := qCtx.Query("SELECT * FROM users").
    Collect(func(rows []*Ydb.Value) error {
        for _, row := range rows {
            fmt.Printf("row: %v\n", row)
        }
        return nil
    }).Exec(ctx)

Transactions

Tx() creates transaction entity which allows to execute several queries in one transaction. Under the hood it will acquire and hold YDB session until transaction is finished. Tx mode is inherited from query.Ctx. Read more about transactions here https://ydb.tech/docs/en/concepts/transactions.

tx, err := qCtx.Tx(ctx)
if errTx != nil {
    panic(err)
}
res, err = tx.Query(`DECLARE $user_id AS Uint64;
    DECLARE $first_name AS Utf8;
    DECLARE $last_name AS Utf8;
    DECLARE $email AS Utf8;
    DECLARE $registered_ts AS Uint64;
    UPSERT INTO users (
        user_id, first_name, last_name, email, registered_ts
    ) VALUES (
        $user_id, $first_name, $last_name, $email, $registered_ts)`).
Param("$user_id", types.Uint64(123)).
Param("$first_name", types.UTF8("test")).
Param("$last_name", types.UTF8("test")).
Param("$email", types.UTF8("test@test.test")).
Param("$registered_ts", types.Uint64(1726836887)).
Exec(ctx)

if err != nil {
    panic(err)
}

// more queries here
// ...

// commit transaction
err = tx.Commit(ctx)
if err != nil {
    panic(err)
}

You also can send an inline commit together with the last query in the current transaction. This way transaction will be committed immediately after (successful) query execution and explicit tx.Commit() call is not needed. This approach saves you one round trip to YDB.

res, err := tx.Query("...").
    Param("$qwe", types.UTF8("test")).
    Commit().
    Exec(ctx)
// no need to call tx.Commit() after this

Uncommitted transactions can be rolled back with tx.Rollback().

Feedback

If you've spotted a bug or interested in some improvement feel free to open an Issue. PRs are also welcome.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoInitialNodes           = errors.New("no initial nodes was provided")
	ErrDBEmpty                  = errors.New("db is empty")
	ErrDiscoveryTransportCreate = errors.New("discovery transport create error")
)
View Source
var ErrAuthTransport = errors.New("unable to create auth transport")
View Source
var (
	ErrAuthentication = errors.New("authentication failed")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func Open

func Open(ctx context.Context, cfg Config, opts ...Option) (*Client, error)

func (*Client) Close

func (c *Client) Close()

func (*Client) QueryCtx

func (c *Client) QueryCtx() *qq.Ctx

func (*Client) Ready added in v0.0.2

func (c *Client) Ready() bool

type Config

type Config struct {
	DB           string
	InitialNodes []string
	// contains filtered or unexported fields
}

type Option

type Option func(context.Context, *Config) error

func WithConnectionsPerEndpoint added in v0.0.2

func WithConnectionsPerEndpoint(connections int) Option

func WithLocationPreference added in v0.0.2

func WithLocationPreference(pref string) Option

func WithLogger

func WithLogger(log logger.Logger) Option

func WithOnlineReadOnly

func WithOnlineReadOnly() Option

func WithOnlineReadOnlyInconsistent

func WithOnlineReadOnlyInconsistent() Option

func WithQueryTimeout added in v0.0.1

func WithQueryTimeout(timeout time.Duration) Option

func WithSerializableReadWrite

func WithSerializableReadWrite() Option

func WithSessionCreateTimeout

func WithSessionCreateTimeout(timeout time.Duration) Option

func WithSessionPoolReadyThresholds added in v0.0.2

func WithSessionPoolReadyThresholds(high, low uint) Option

func WithSessionPoolSize

func WithSessionPoolSize(size uint) Option

func WithSnapshotReadOnly

func WithSnapshotReadOnly() Option

func WithStaleReadOnly

func WithStaleReadOnly() Option

func WithTransportTLS

func WithTransportTLS() Option

func WithUserPass

func WithUserPass(username, password string) Option

func WithYCAuthBytes

func WithYCAuthBytes(iamKeyBytes []byte) Option

func WithYCAuthFile

func WithYCAuthFile(filename string) Option

func WithZapLogger added in v0.0.1

func WithZapLogger(log *zap.Logger, level string) Option

func WithZeroLogger

func WithZeroLogger(log zerolog.Logger, level string) Option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL