client

package
v1.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2025 License: Apache-2.0 Imports: 11 Imported by: 0

README

Kevo Go Client SDK

This package provides a Go client for connecting to a Kevo database server. The client uses the gRPC transport layer to communicate with the server and provides an idiomatic Go API for working with Kevo.

Features

  • Simple key-value operations (Get, Put, Delete)
  • Batch operations for atomic writes
  • Transaction support with ACID guarantees
  • Iterator API for efficient range scans
  • Connection pooling and automatic retries
  • TLS support for secure communication
  • Comprehensive error handling
  • Configurable timeouts and backoff strategies

Installation

go get github.com/KevoDB/kevo

Quick Start

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/KevoDB/kevo/pkg/client"
	_ "github.com/KevoDB/kevo/pkg/grpc/transport" // Register gRPC transport
)

func main() {
	// Create a client with default options
	options := client.DefaultClientOptions()
	options.Endpoint = "localhost:50051"

	c, err := client.NewClient(options)
	if err != nil {
		log.Fatalf("Failed to create client: %v", err)
	}

	// Connect to the server
	ctx := context.Background()
	if err := c.Connect(ctx); err != nil {
		log.Fatalf("Failed to connect: %v", err)
	}
	defer c.Close()

	// Basic key-value operations
	key := []byte("hello")
	value := []byte("world")

	// Store a value
	if _, err := c.Put(ctx, key, value, true); err != nil {
		log.Fatalf("Put failed: %v", err)
	}

	// Retrieve a value
	val, found, err := c.Get(ctx, key)
	if err != nil {
		log.Fatalf("Get failed: %v", err)
	}

	if found {
		fmt.Printf("Value: %s\n", val)
	} else {
		fmt.Println("Key not found")
	}

	// Delete a value
	if _, err := c.Delete(ctx, key, true); err != nil {
		log.Fatalf("Delete failed: %v", err)
	}
}

Configuration Options

The client can be configured using the ClientOptions struct:

options := client.ClientOptions{
	// Connection options
	Endpoint:        "localhost:50051",
	ConnectTimeout:  5 * time.Second,
	RequestTimeout:  10 * time.Second,
	TransportType:   "grpc",
	PoolSize:        5,

	// Security options
	TLSEnabled:      true,
	CertFile:        "/path/to/cert.pem",
	KeyFile:         "/path/to/key.pem",
	CAFile:          "/path/to/ca.pem",

	// Retry options
	MaxRetries:      3,
	InitialBackoff:  100 * time.Millisecond,
	MaxBackoff:      2 * time.Second,
	BackoffFactor:   1.5,
	RetryJitter:     0.2,

	// Performance options
	Compression:     client.CompressionGzip,
	MaxMessageSize:  16 * 1024 * 1024, // 16MB
}

Transactions

// Begin a transaction
tx, err := client.BeginTransaction(ctx, false) // readOnly=false
if err != nil {
	log.Fatalf("Failed to begin transaction: %v", err)
}

// Perform operations within the transaction
success, err := tx.Put(ctx, []byte("key1"), []byte("value1"))
if err != nil {
	tx.Rollback(ctx) // Rollback on error
	log.Fatalf("Transaction put failed: %v", err)
}

// Commit the transaction
if err := tx.Commit(ctx); err != nil {
	log.Fatalf("Transaction commit failed: %v", err)
}

Scans and Iterators

// Set up scan options
scanOptions := client.ScanOptions{
	Prefix:   []byte("user:"),  // Optional prefix
	StartKey: []byte("user:1"), // Optional start key (inclusive)
	EndKey:   []byte("user:9"), // Optional end key (exclusive)
	Limit:    100,              // Optional limit
}

// Create a scanner
scanner, err := client.Scan(ctx, scanOptions)
if err != nil {
	log.Fatalf("Failed to create scanner: %v", err)
}
defer scanner.Close()

// Iterate through results
for scanner.Next() {
	fmt.Printf("Key: %s, Value: %s\n", scanner.Key(), scanner.Value())
}

// Check for errors after iteration
if err := scanner.Error(); err != nil {
	log.Fatalf("Scan error: %v", err)
}

Batch Operations

// Create a batch of operations
operations := []client.BatchOperation{
	{Type: "put", Key: []byte("key1"), Value: []byte("value1")},
	{Type: "put", Key: []byte("key2"), Value: []byte("value2")},
	{Type: "delete", Key: []byte("old-key")},
}

// Execute the batch atomically
success, err := client.BatchWrite(ctx, operations, true)
if err != nil {
	log.Fatalf("Batch write failed: %v", err)
}

Error Handling and Retries

The client automatically handles retries for transient errors using exponential backoff with jitter. You can configure the retry behavior using the RetryPolicy in the client options.

// Manual retry with custom policy
err = client.RetryWithBackoff(
	ctx,
	func() error {
		_, _, err := c.Get(ctx, key)
		return err
	},
	3,                     // maxRetries
	100*time.Millisecond,  // initialBackoff
	2*time.Second,         // maxBackoff
	2.0,                   // backoffFactor
	0.2,                   // jitter
)

Database Statistics

// Get database statistics
stats, err := client.GetStats(ctx)
if err != nil {
	log.Fatalf("Failed to get stats: %v", err)
}

fmt.Printf("Key count: %d\n", stats.KeyCount)
fmt.Printf("Storage size: %d bytes\n", stats.StorageSize)
fmt.Printf("MemTable count: %d\n", stats.MemtableCount)
fmt.Printf("SSTable count: %d\n", stats.SstableCount)
fmt.Printf("Write amplification: %.2f\n", stats.WriteAmplification)
fmt.Printf("Read amplification: %.2f\n", stats.ReadAmplification)

Compaction

// Trigger compaction
success, err := client.Compact(ctx, false) // force=false
if err != nil {
	log.Fatalf("Compaction failed: %v", err)
}

Documentation

Index

Constants

View Source
const (
	CompressionNone   = transport.CompressionNone
	CompressionGzip   = transport.CompressionGzip
	CompressionSnappy = transport.CompressionSnappy
)

Compression options

Variables

View Source
var (
	// ErrNotConnected indicates the client is not connected to the server
	ErrNotConnected = errors.New("not connected to server")

	// ErrInvalidOptions indicates invalid client options
	ErrInvalidOptions = errors.New("invalid client options")

	// ErrTimeout indicates a request timed out
	ErrTimeout = errors.New("request timed out")

	// ErrKeyNotFound indicates a key was not found
	ErrKeyNotFound = errors.New("key not found")

	// ErrTransactionConflict indicates a transaction conflict occurred
	ErrTransactionConflict = errors.New("transaction conflict detected")
)

Errors that can occur during client operations

View Source
var ErrTransactionClosed = errors.New("transaction is closed")

ErrTransactionClosed is returned when attempting to use a closed transaction

Functions

func CalculateExponentialBackoff

func CalculateExponentialBackoff(
	attempt int,
	initialBackoff time.Duration,
	maxBackoff time.Duration,
	backoffFactor float64,
	jitter float64,
) time.Duration

CalculateExponentialBackoff calculates the backoff time for a given attempt

func IsRetryableError

func IsRetryableError(err error) bool

IsRetryableError returns true if the error is considered retryable

func RetryWithBackoff

func RetryWithBackoff(
	ctx context.Context,
	fn RetryableFunc,
	maxRetries int,
	initialBackoff time.Duration,
	maxBackoff time.Duration,
	backoffFactor float64,
	jitter float64,
) error

RetryWithBackoff executes a function with exponential backoff and jitter

Types

type BatchOperation

type BatchOperation struct {
	Type  string // "put" or "delete"
	Key   []byte
	Value []byte // only used for "put" operations
}

BatchOperation represents a single operation in a batch

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a connection to a Kevo database server

func NewClient

func NewClient(options ClientOptions) (*Client, error)

NewClient creates a new Kevo client with the given options

func (*Client) BatchWrite

func (c *Client) BatchWrite(ctx context.Context, operations []BatchOperation, sync bool) (bool, error)

BatchWrite performs multiple operations in a single atomic batch If connected to a replica, it will automatically route the batch to the primary

func (*Client) BeginTransaction

func (c *Client) BeginTransaction(ctx context.Context, readOnly bool) (*Transaction, error)

BeginTransaction starts a new transaction

func (*Client) Close

func (c *Client) Close() error

Close closes all connections to servers

func (*Client) Compact

func (c *Client) Compact(ctx context.Context, force bool) (bool, error)

Compact triggers compaction of the database

func (*Client) Connect

func (c *Client) Connect(ctx context.Context) error

Connect establishes a connection to the server and discovers the replication topology if available

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, key []byte, sync bool) (bool, error)

Delete removes a key-value pair If connected to a replica, it will automatically route the delete to the primary

func (*Client) Get

func (c *Client) Get(ctx context.Context, key []byte) ([]byte, bool, error)

Get retrieves a value by key If connected to a primary with replicas, it will route reads to a replica

func (*Client) GetReplicationInfo added in v1.3.0

func (c *Client) GetReplicationInfo() (*NodeInfo, error)

GetNodeInfo returns information about the current node and replication topology

func (*Client) GetStats

func (c *Client) GetStats(ctx context.Context) (*Stats, error)

GetStats retrieves database statistics

func (*Client) IsConnected

func (c *Client) IsConnected() bool

IsConnected returns whether the client is connected to the server

func (*Client) IsPrimary added in v1.3.0

func (c *Client) IsPrimary() bool

IsPrimary returns true if the connected node is a primary

func (*Client) IsReplica added in v1.3.0

func (c *Client) IsReplica() bool

IsReplica returns true if the connected node is a replica

func (*Client) IsStandalone added in v1.3.0

func (c *Client) IsStandalone() bool

IsStandalone returns true if the connected node is standalone (not part of replication)

func (*Client) Put

func (c *Client) Put(ctx context.Context, key, value []byte, sync bool) (bool, error)

Put stores a key-value pair If connected to a replica, it will automatically route the write to the primary

func (*Client) RefreshTopology added in v1.3.0

func (c *Client) RefreshTopology(ctx context.Context) error

RefreshTopology updates the replication topology information

func (*Client) Scan

func (c *Client) Scan(ctx context.Context, options ScanOptions) (Scanner, error)

Scan creates a scanner to iterate over keys in the database

type ClientOptions

type ClientOptions struct {
	// Connection options
	Endpoint       string        // Server address
	ConnectTimeout time.Duration // Timeout for connection attempts
	RequestTimeout time.Duration // Default timeout for requests
	TransportType  string        // Transport type (e.g. "grpc")
	PoolSize       int           // Connection pool size

	// Security options
	TLSEnabled bool   // Enable TLS
	CertFile   string // Client certificate file
	KeyFile    string // Client key file
	CAFile     string // CA certificate file

	// Retry options
	MaxRetries     int           // Maximum number of retries
	InitialBackoff time.Duration // Initial retry backoff
	MaxBackoff     time.Duration // Maximum retry backoff
	BackoffFactor  float64       // Backoff multiplier
	RetryJitter    float64       // Random jitter factor

	// Performance options
	Compression    CompressionType // Compression algorithm
	MaxMessageSize int             // Maximum message size

	// Keepalive options
	KeepaliveTime    time.Duration // Time between keepalive pings (0 for default)
	KeepaliveTimeout time.Duration // Time to wait for ping ack (0 for default)
}

ClientOptions configures a Kevo client

func DefaultClientOptions

func DefaultClientOptions() ClientOptions

DefaultClientOptions returns sensible default client options

type CompressionType

type CompressionType = transport.CompressionType

CompressionType represents a compression algorithm

type KeyValue

type KeyValue struct {
	Key   []byte
	Value []byte
}

KeyValue represents a key-value pair from a scan

type NodeInfo added in v1.3.0

type NodeInfo struct {
	Role         string        // "primary", "replica", or "standalone"
	PrimaryAddr  string        // Address of the primary node
	Replicas     []ReplicaInfo // Available replica nodes
	LastSequence uint64        // Last applied sequence number
	ReadOnly     bool          // Whether the node is in read-only mode
}

NodeInfo contains information about the server node and topology

type ReplicaInfo added in v1.3.0

type ReplicaInfo struct {
	Address      string            // Host:port of the replica
	LastSequence uint64            // Last applied sequence number
	Available    bool              // Whether the replica is available
	Region       string            // Optional region information
	Meta         map[string]string // Additional metadata
}

ReplicaInfo represents information about a replica node

type RetryableFunc

type RetryableFunc func() error

RetryableFunc is a function that can be retried

type ScanOptions

type ScanOptions struct {
	// Prefix limit the scan to keys with this prefix
	Prefix []byte
	// Suffix limit the scan to keys with this suffix
	Suffix []byte
	// StartKey sets the starting point for the scan (inclusive)
	StartKey []byte
	// EndKey sets the ending point for the scan (exclusive)
	EndKey []byte
	// Limit sets the maximum number of key-value pairs to return
	Limit int32
}

ScanOptions configures a scan operation

type Scanner

type Scanner interface {
	// Next advances the scanner to the next key-value pair
	Next() bool
	// Key returns the current key
	Key() []byte
	// Value returns the current value
	Value() []byte
	// Error returns any error that occurred during iteration
	Error() error
	// Close releases resources associated with the scanner
	Close() error
}

Scanner interface for iterating through keys and values

type Stats

type Stats struct {
	KeyCount           int64
	StorageSize        int64
	MemtableCount      int32
	SstableCount       int32
	WriteAmplification float64
	ReadAmplification  float64
}

Stats contains database statistics

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction represents a database transaction

func (*Transaction) Commit

func (tx *Transaction) Commit(ctx context.Context) error

Commit commits the transaction

func (*Transaction) Delete

func (tx *Transaction) Delete(ctx context.Context, key []byte) (bool, error)

Delete removes a key-value pair within the transaction

func (*Transaction) Get

func (tx *Transaction) Get(ctx context.Context, key []byte) ([]byte, bool, error)

Get retrieves a value by key within the transaction

func (*Transaction) Put

func (tx *Transaction) Put(ctx context.Context, key, value []byte) (bool, error)

Put stores a key-value pair within the transaction

func (*Transaction) Rollback

func (tx *Transaction) Rollback(ctx context.Context) error

Rollback aborts the transaction

func (*Transaction) Scan

func (tx *Transaction) Scan(ctx context.Context, options ScanOptions) (Scanner, error)

Scan creates a scanner to iterate over keys in the transaction

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL