client

package
v0.0.0-...-e7d2c85 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrTxnCommitted    = errors.New("txn: already committed")
	ErrTxnRolledBack   = errors.New("txn: already rolled back")
	ErrWriteConflict   = errors.New("txn: write conflict")
	ErrTxnLockNotFound = errors.New("txn: lock not found")
	ErrDeadlock        = errors.New("txn: deadlock")
)

Transaction state errors.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides access to gookv sub-clients (RawKV, etc.).

func NewClient

func NewClient(ctx context.Context, cfg Config) (*Client, error)

NewClient creates a client connected to PD.

func (*Client) Close

func (c *Client) Close() error

Close releases all resources.

func (*Client) PD

func (c *Client) PD() pdclient.Client

PD returns the underlying pdclient.Client for administrative operations.

func (*Client) RawKV

func (c *Client) RawKV() *RawKVClient

RawKV returns a RawKVClient for performing Raw KV operations.

func (*Client) TxnKV

func (c *Client) TxnKV() *TxnKVClient

TxnKV returns a TxnKVClient for performing transactional KV operations.

type Config

type Config struct {
	PDAddrs       []string
	DialTimeout   time.Duration // default: 5s
	MaxRetries    int           // default: 3
	StoreCacheTTL time.Duration // default: 30s
	ConnPoolSize  int           // gRPC connections per store (default: 2)
}

Config holds configuration for the client library.

type ConnPool

type ConnPool struct {
	// contains filtered or unexported fields
}

ConnPool manages a pool of gRPC connections per store address. It uses round-robin selection to distribute RPCs across connections.

func NewConnPool

func NewConnPool(cfg ConnPoolConfig) *ConnPool

NewConnPool creates a new ConnPool.

func (*ConnPool) Close

func (p *ConnPool) Close()

Close closes all pooled connections.

func (*ConnPool) Get

func (p *ConnPool) Get(addr string) (*grpc.ClientConn, error)

Get returns a gRPC connection to the given address using round-robin selection.

type ConnPoolConfig

type ConnPoolConfig struct {
	PoolSize int                                         // connections per store (default 2)
	DialFunc func(addr string) (*grpc.ClientConn, error) // injectable dialer for testing
}

ConnPoolConfig configures a ConnPool.

type KeyGroup

type KeyGroup struct {
	Info *RegionInfo
	Keys [][]byte
}

KeyGroup holds keys grouped by region for batch operations.

type KeyRange

type KeyRange struct {
	StartKey []byte
	EndKey   []byte
}

KeyRange represents a key range for batch operations.

type KvPair

type KvPair struct {
	Key   []byte
	Value []byte
}

KvPair represents a key-value pair.

type LockResolver

type LockResolver struct {
	// contains filtered or unexported fields
}

LockResolver resolves locks left by crashed or slow transactions.

func NewLockResolver

func NewLockResolver(sender *RegionRequestSender, cache *RegionCache, pdClient pdclient.Client) *LockResolver

NewLockResolver creates a new LockResolver.

func (*LockResolver) ResolveLocks

func (lr *LockResolver) ResolveLocks(ctx context.Context, locks []*kvrpcpb.LockInfo) error

ResolveLocks resolves the given locks by checking their transaction status and either committing or rolling back each one.

type PDStoreResolver

type PDStoreResolver struct {
	// contains filtered or unexported fields
}

PDStoreResolver resolves store IDs to gRPC addresses via PD, with TTL caching.

func NewPDStoreResolver

func NewPDStoreResolver(pdClient pdclient.Client, ttl time.Duration) *PDStoreResolver

NewPDStoreResolver creates a new resolver with the given PD client and cache TTL.

func (*PDStoreResolver) InvalidateStore

func (r *PDStoreResolver) InvalidateStore(storeID uint64)

InvalidateStore removes the cached address for a store.

func (*PDStoreResolver) Resolve

func (r *PDStoreResolver) Resolve(ctx context.Context, storeID uint64) (string, error)

Resolve returns the gRPC address for the given store ID. Returns from cache if fresh; queries PD if stale or missing.

type RPCFunc

type RPCFunc func(client tikvpb.TikvClient, info *RegionInfo) (regionErr *errorpb.Error, err error)

RPCFunc is the callback for making a gRPC call. It receives the TiKV client and region info (for populating request context). It should return the region_error from the response (if any) and any gRPC error.

type RawKVClient

type RawKVClient struct {
	// contains filtered or unexported fields
}

RawKVClient provides a high-level Raw KV API with automatic region routing.

func (*RawKVClient) BatchDelete

func (c *RawKVClient) BatchDelete(ctx context.Context, keys [][]byte) error

BatchDelete removes multiple keys, routing each to its region.

func (*RawKVClient) BatchGet

func (c *RawKVClient) BatchGet(ctx context.Context, keys [][]byte) ([]KvPair, error)

BatchGet retrieves values for multiple keys, routing each to its region.

func (*RawKVClient) BatchPut

func (c *RawKVClient) BatchPut(ctx context.Context, pairs []KvPair) error

BatchPut stores multiple key-value pairs, routing each to its region.

func (*RawKVClient) BatchScan

func (c *RawKVClient) BatchScan(ctx context.Context, ranges []KeyRange, eachLimit int) ([]KvPair, error)

BatchScan retrieves key-value pairs from multiple ranges, up to eachLimit per range. Each range is scanned independently, crossing region boundaries as needed.

func (*RawKVClient) Checksum

func (c *RawKVClient) Checksum(ctx context.Context, startKey, endKey []byte) (uint64, uint64, uint64, error)

Checksum computes a checksum over a key range, spanning region boundaries.

func (*RawKVClient) Close

func (c *RawKVClient) Close() error

Close is a no-op for RawKVClient; the parent Client manages lifecycle.

func (*RawKVClient) CompareAndSwap

func (c *RawKVClient) CompareAndSwap(ctx context.Context, key, value, prevValue []byte, prevNotExist bool) (bool, []byte, error)

CompareAndSwap atomically compares and swaps a value.

func (*RawKVClient) Delete

func (c *RawKVClient) Delete(ctx context.Context, key []byte) error

Delete removes a key.

func (*RawKVClient) DeleteRange

func (c *RawKVClient) DeleteRange(ctx context.Context, startKey, endKey []byte) error

DeleteRange deletes all keys in a range, spanning region boundaries.

func (*RawKVClient) Get

func (c *RawKVClient) Get(ctx context.Context, key []byte) ([]byte, bool, error)

Get retrieves the value for a key.

func (*RawKVClient) GetKeyTTL

func (c *RawKVClient) GetKeyTTL(ctx context.Context, key []byte) (uint64, error)

GetKeyTTL returns the remaining TTL for a key.

func (*RawKVClient) Put

func (c *RawKVClient) Put(ctx context.Context, key, value []byte) error

Put stores a key-value pair.

func (*RawKVClient) PutWithTTL

func (c *RawKVClient) PutWithTTL(ctx context.Context, key, value []byte, ttl uint64) error

PutWithTTL stores a key-value pair with a TTL in seconds.

func (*RawKVClient) Scan

func (c *RawKVClient) Scan(ctx context.Context, startKey, endKey []byte, limit int) ([]KvPair, error)

Scan retrieves key-value pairs in a range, transparently crossing region boundaries.

type RegionCache

type RegionCache struct {
	// contains filtered or unexported fields
}

RegionCache caches key-to-region mappings with PD fallback.

func NewRegionCache

func NewRegionCache(pdClient pdclient.Client, resolver *PDStoreResolver) *RegionCache

NewRegionCache creates a new empty region cache.

func (*RegionCache) GroupKeysByRegion

func (c *RegionCache) GroupKeysByRegion(ctx context.Context, keys [][]byte) (map[uint64]*KeyGroup, error)

GroupKeysByRegion groups a set of keys by their region.

func (*RegionCache) InsertFromEpochNotMatch

func (c *RegionCache) InsertFromEpochNotMatch(ctx context.Context, region *metapb.Region, resolver *PDStoreResolver)

InsertFromEpochNotMatch inserts a region from an EpochNotMatch server response directly into the cache, bypassing PD. This avoids waiting for PD to propagate split metadata, which is the root cause of intermittent Put timeouts during splits.

func (*RegionCache) InvalidateRegion

func (c *RegionCache) InvalidateRegion(regionID uint64)

InvalidateRegion removes the cached entry for the given region ID.

func (*RegionCache) LocateKey

func (c *RegionCache) LocateKey(ctx context.Context, key []byte) (*RegionInfo, error)

LocateKey returns the RegionInfo for the given key. Returns from cache if available; queries PD on cache miss. The key is a raw user key which is encoded for comparison against region boundaries (which use memcomparable encoding).

func (*RegionCache) UpdateLeader

func (c *RegionCache) UpdateLeader(regionID uint64, leader *metapb.Peer, storeAddr string)

UpdateLeader updates the cached leader for a region with the given store address.

type RegionInfo

type RegionInfo struct {
	Region    *metapb.Region
	Leader    *metapb.Peer
	StoreAddr string
}

RegionInfo holds cached region metadata with its leader and store address.

type RegionRequestSender

type RegionRequestSender struct {
	// contains filtered or unexported fields
}

RegionRequestSender sends RPCs to the correct store for a given key, handling region errors and retries with cache invalidation.

func NewRegionRequestSender

func NewRegionRequestSender(cache *RegionCache, resolver *PDStoreResolver, maxRetries int, dialTimeout time.Duration) *RegionRequestSender

NewRegionRequestSender creates a new sender with a connection pool.

func NewRegionRequestSenderWithPool

func NewRegionRequestSenderWithPool(cache *RegionCache, resolver *PDStoreResolver, maxRetries int, pool *ConnPool) *RegionRequestSender

NewRegionRequestSenderWithPool creates a sender with a custom ConnPool.

func (*RegionRequestSender) Close

func (s *RegionRequestSender) Close()

Close closes all cached connections.

func (*RegionRequestSender) SendToRegion

func (s *RegionRequestSender) SendToRegion(ctx context.Context, key []byte, rpcFn RPCFunc) error

SendToRegion locates the region for the given key, sends the RPC, and retries on retriable region errors. Retries up to maxRetries times with exponential backoff (100ms, 200ms, 400ms, ...), capped at 2s per sleep. Also respects the context deadline.

type TxnHandle

type TxnHandle struct {
	// contains filtered or unexported fields
}

TxnHandle represents an active transaction with a buffered mutation set.

func (*TxnHandle) BatchGet

func (t *TxnHandle) BatchGet(ctx context.Context, keys [][]byte) ([]KvPair, error)

BatchGet reads multiple keys. It checks the local buffer first, then issues KvBatchGet RPCs for remaining keys grouped by region.

func (*TxnHandle) Commit

func (t *TxnHandle) Commit(ctx context.Context) error

Commit commits the transaction using the two-phase commit protocol.

func (*TxnHandle) Delete

func (t *TxnHandle) Delete(ctx context.Context, key []byte) error

Delete buffers a delete mutation. In pessimistic mode, it also acquires a pessimistic lock.

func (*TxnHandle) Get

func (t *TxnHandle) Get(ctx context.Context, key []byte) ([]byte, error)

Get reads the value for key. It first checks the local mutation buffer, then falls back to a KvGet RPC. If the key is locked by another transaction, it resolves the lock and retries.

func (*TxnHandle) Rollback

func (t *TxnHandle) Rollback(ctx context.Context) error

Rollback aborts the transaction by rolling back all prewritten/locked keys. If the commit status is unknown (primary commit attempted but status indeterminate due to region unavailability), Rollback is a no-op to avoid rolling back secondaries while the primary might be committed. Lock resolution will handle the correct outcome once regions recover.

func (*TxnHandle) Scan

func (t *TxnHandle) Scan(ctx context.Context, startKey, endKey []byte, limit int) ([]KvPair, error)

Scan retrieves key-value pairs in [startKey, endKey) at the transaction's snapshot timestamp. It crosses region boundaries transparently, resolves locks encountered during the scan, and merges results with the local mutation buffer (uncommitted writes and deletes).

func (*TxnHandle) Set

func (t *TxnHandle) Set(ctx context.Context, key, value []byte) error

Set buffers a put mutation. In pessimistic mode, it also acquires a pessimistic lock.

func (*TxnHandle) StartTS

func (t *TxnHandle) StartTS() txntypes.TimeStamp

StartTS returns the transaction's start timestamp.

type TxnKVClient

type TxnKVClient struct {
	// contains filtered or unexported fields
}

TxnKVClient provides transactional KV operations.

func NewTxnKVClient

func NewTxnKVClient(sender *RegionRequestSender, cache *RegionCache, pdClient pdclient.Client) *TxnKVClient

NewTxnKVClient creates a new TxnKVClient.

func (*TxnKVClient) Begin

func (c *TxnKVClient) Begin(ctx context.Context, opts ...TxnOption) (*TxnHandle, error)

Begin starts a new transaction with the given options.

func (*TxnKVClient) Close

func (c *TxnKVClient) Close() error

Close releases resources held by the TxnKVClient.

type TxnMode

type TxnMode int

TxnMode defines the transaction mode.

const (
	// TxnModeOptimistic is the default optimistic transaction mode.
	TxnModeOptimistic TxnMode = iota
	// TxnModePessimistic uses pessimistic locking.
	TxnModePessimistic
)

type TxnOption

type TxnOption func(*TxnOptions)

TxnOption is a functional option for configuring TxnOptions.

func With1PC

func With1PC() TxnOption

With1PC enables the 1PC optimization for single-region transactions.

func WithAsyncCommit

func WithAsyncCommit() TxnOption

WithAsyncCommit enables the async commit protocol.

func WithLockTTL

func WithLockTTL(ttl uint64) TxnOption

WithLockTTL sets the lock TTL in milliseconds.

func WithPessimistic

func WithPessimistic() TxnOption

WithPessimistic enables pessimistic transaction mode.

type TxnOptions

type TxnOptions struct {
	Mode           TxnMode
	UseAsyncCommit bool
	Try1PC         bool
	LockTTL        uint64
}

TxnOptions holds options for beginning a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL