client

package
v0.0.0-...-024101a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2015 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package client provides clients for accessing the various externally-facing Cockroach database endpoints.

KV Client

The KV client is a fully-featured client of Cockroach's key-value database. It provides a simple, synchronous interface well-suited to parallel updates and queries.

The simplest way to use the client is through the Call method. Call synchronously invokes the method and returns the reply and an error. The example below shows a get and a put.

kv := client.NewKV(client.NewHTTPSender("localhost:8080", tlsConfig), nil)

getResp := &proto.GetResponse{}
if err := kv.Call(proto.Get, proto.GetArgs(proto.Key("a")), getResp); err != nil {
  log.Fatal(err)
}
putResp := &proto.PutResponse{}
if _, err := kv.Call(proto.Put, proto.PutArgs(proto.Key("b"), getResp.Value.Bytes), putResp) err != nil {
  log.Fatal(err)
}

The API is synchronous, but accommodates efficient parallel updates and queries using the Prepare method. An arbitrary number of Prepare invocations are followed up with a call to Flush. Until the Flush, requests are buffered locally in anticipation of being sent to Cockroach as part of a batch. The Flush batches prepared calls and sends them together. Note however that API calls which are buffered and sent together are not guaranteed to have atomic semantics. A transaction must be used to guarantee atomicity. A simple example of using the API which does two scans in parallel and then sends a sequence of puts in parallel:

kv := client.NewKV(client.NewHTTPSender("localhost:8080", tlsConfig), nil)

acResp, xzResp := &proto.ScanResponse{}, &proto.ScanResponse{}
kv.Prepare(proto.Scan, proto.ScanArgs(proto.Key("a"), proto.Key("c").Next()), acResp)
kv.Prepare(proto.Scan, proto.ScanArgs(proto.Key("x"), proto.Key("z").Next()), xzResp)

// Flush sends both scans in parallel and returns first error or nil.
if err := kv.Flush(); err != nil {
  log.Fatal(err)
}

// Append maximum value from "a"-"c" to all values from "x"-"z".
max := []byte(nil)
for _, keyVal := range acResp.Rows {
  if bytes.Compare(max, keyVal.Value.Bytes) < 0 {
    max = keyVal.Value.Bytes
  }
}
for keyVal := range xzResp.Rows {
  putReq := proto.PutArgs(keyVal.Key, bytes.Join([][]byte{keyVal.Value.Bytes, max}, []byte(nil)))
  kv.Prepare(proto.Put, putReq, &proto.PutReponse{})
}

// Flush all puts for parallel execution.
if _, err := kv.Flush(); err != nil {
  log.Fatal(err)
}

Transactions are supported through the RunTransaction() method, which takes a retryable function, itself composed of the same simple mix of API calls typical of a non-transactional operation. Within the context of the RunTransaction call, all method invocations are transparently given necessary transactional details, and conflicts are handled with backoff/retry loops and transaction restarts as necessary. An example of using transactions with parallel writes:

kv := client.NewKV(client.NewHTTPSender("localhost:8080", tlsConfig), nil)

opts := client.TransactionOptions{Name: "test", Isolation: proto.SERIALIZABLE}
err := kv.RunTransaction(opts, func(txn *client.KV) error {
  for i := 0; i < 100; i++ {
    key := proto.Key(fmt.Sprintf("testkey-%02d", i))
    txn.Prepare(proto.Put, proto.PutArgs(key, []byte("test value")), &proto.PutResponse{})
  }

  // Note that the KV client is flushed automatically on transaction
  // commit. Invoking Flush after individual API methods is only
  // required if the result needs to be received to take conditional
  // action.
  return nil
})
if err != nil {
  log.Fatal(err)
}

Note that with Cockroach's lock-free transactions, clients should expect retries as a matter of course. This is why the transaction functionality is exposed through a retryable function. The retryable function should have no side effects which are not idempotent.

Transactions should endeavor to write using KV.Prepare calls. This allows writes to the same range to be batched together. In cases where the entire transaction affects only a single range, transactions can commit in a single round trip.

Index

Examples

Constants

View Source
const (
	// KVDBEndpoint is the URL path prefix which accepts incoming
	// HTTP requests for the KV API.
	KVDBEndpoint = "/kv/db/"
	// KVDBScheme is the scheme for connecting to the kvdb endpoint.
	// TODO(spencer): change this to CONSTANT https. We shouldn't be
	// supporting http here at all.
	KVDBScheme = "http"
	// StatusTooManyRequests indicates client should retry due to
	// server having too many requests.
	StatusTooManyRequests = 429
)

Variables

View Source
var HTTPRetryOptions = util.RetryOptions{
	Backoff:     50 * time.Millisecond,
	MaxBackoff:  5 * time.Second,
	Constant:    2,
	MaxAttempts: 0,
}

HTTPRetryOptions sets the retry options for handling retryable HTTP errors and connection I/O errors.

View Source
var TxnRetryOptions = util.RetryOptions{
	Backoff:     50 * time.Millisecond,
	MaxBackoff:  5 * time.Second,
	Constant:    2,
	MaxAttempts: 0,
}

TxnRetryOptions sets the retry options for handling write conflicts.

Functions

This section is empty.

Types

type Call

type Call struct {
	Method string         // The name of the database command (see api.proto)
	Args   proto.Request  // The argument to the command
	Reply  proto.Response // The reply from the command
}

A Call is a pending database API call.

type Clock

type Clock interface {
	// Now returns nanoseconds since the Jan 1, 1970 GMT.
	Now() int64
}

A Clock is an interface which provides the current time.

type HTTPSender

type HTTPSender struct {
	// contains filtered or unexported fields
}

HTTPSender is an implementation of KVSender which exposes the Key-Value database provided by a Cockroach cluster by connecting via HTTP to a Cockroach node. Overly-busy nodes will redirect this client to other nodes.

func NewHTTPSender

func NewHTTPSender(server string, transport *http.Transport) *HTTPSender

NewHTTPSender returns a new instance of HTTPSender.

func (*HTTPSender) Close

func (s *HTTPSender) Close()

Close implements the KVSender interface.

func (*HTTPSender) Send

func (s *HTTPSender) Send(call *Call)

Send sends call to Cockroach via an HTTP post. HTTP response codes which are retryable are retried with backoff in a loop using the default retry options. Other errors sending HTTP request are retried indefinitely using the same client command ID to avoid reporting failure when in fact the command may have gone through and been executed successfully. We retry here to eventually get through with the same client command ID and be given the cached response.

type KV

type KV struct {
	// User is the default user to set on API calls. If User is set to
	// non-empty in call arguments, this value is ignored.
	User string
	// UserPriority is the default user priority to set on API calls. If
	// UserPriority is set non-zero in call arguments, this value is
	// ignored.
	UserPriority int32
	// contains filtered or unexported fields
}

KV provides serial access to a KV store via Call and parallel access via Prepare and Flush. A KV instance is not thread safe.

func NewKV

func NewKV(sender KVSender, clock Clock) *KV

NewKV creates a new instance of KV using the specified sender. To create a transactional client, the KV struct should be manually initialized in order to utilize a txnSender. Clock is used to formulate client command IDs, which provide idempotency on API calls. If clock is nil, uses time.UnixNanos as default implementation.

func (*KV) Call

func (kv *KV) Call(method string, args proto.Request, reply proto.Response) error

Call invokes the KV command synchronously and returns the response and error, if applicable. If preceeding calls have been made to Prepare() without a call to Flush(), this call is prepared and then all prepared calls are flushed.

Example

This is an example for using the Call() method to Put and then Get a value for a given key.

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"testing"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/proto"
	"github.com/cockroachdb/cockroach/rpc"
	"github.com/cockroachdb/cockroach/server"
	"github.com/cockroachdb/cockroach/storage"
	"github.com/cockroachdb/cockroach/util/log"
)

func StartTestServer(t *testing.T) *server.TestServer {
	s := &server.TestServer{}
	if err := s.Start(); err != nil {
		t.Fatalf("Could not start server: %v", err)
	}
	log.Infof("Test server listening on http: %s, rpc: %s", s.HTTPAddr, s.RPCAddr)
	return s
}

func main() {
	// Using built-in test server for this example code.
	serv := StartTestServer(nil)
	defer serv.Stop()

	// Replace with actual host:port address string (ex "localhost:8080") for server cluster.
	serverAddress := serv.HTTPAddr

	// Key Value Client initialization.
	sender := client.NewHTTPSender(serverAddress, &http.Transport{
		TLSClientConfig: rpc.LoadInsecureTLSConfig().Config(),
	})
	kvClient := client.NewKV(sender, nil)
	kvClient.User = storage.UserRoot
	defer kvClient.Close()

	key := proto.Key("a")
	value := []byte{1, 2, 3, 4}

	// Store test value.
	putResp := &proto.PutResponse{}
	if err := kvClient.Call(proto.Put, proto.PutArgs(key, value), putResp); err != nil {
		log.Fatal(err)
	}

	// Retrieve test value using same key.
	getResp := &proto.GetResponse{}
	if err := kvClient.Call(proto.Get, proto.GetArgs(key), getResp); err != nil {
		log.Fatal(err)
	}

	// Data validation.
	if getResp.Value == nil {
		log.Fatal("No value returned.")
	}
	if !bytes.Equal(value, getResp.Value.Bytes) {
		log.Fatal("Data mismatch on retrieved value.")
	}

	fmt.Println("Client example done.")
}
Output:

Client example done.

func (*KV) Close

func (kv *KV) Close()

Close closes the KV client and its sender.

func (*KV) Flush

func (kv *KV) Flush() (err error)

Flush sends all previously prepared calls, buffered by invocations of Prepare(). The calls are organized into a single batch command and sent together. Flush returns nil if all prepared calls are executed successfully. Otherwise, Flush returns the first error, where calls are executed in the order in which they were prepared. After Flush returns, all prepared reply structs will be valid.

func (*KV) GetI

func (kv *KV) GetI(key proto.Key, iface interface{}) (bool, proto.Timestamp, error)

GetI fetches the value at the specified key and gob-deserializes it into "value". Returns true on success or false if the key was not found. The timestamp of the write is returned as the second return value. The first result parameter is "ok": true if a value was found for the requested key; false otherwise. An error is returned on error fetching from underlying storage or deserializing value.

func (*KV) GetProto

func (kv *KV) GetProto(key proto.Key, msg gogoproto.Message) (bool, proto.Timestamp, error)

GetProto fetches the value at the specified key and unmarshals it using a protobuf decoder. See comments for GetI for details on return values.

func (*KV) Prepare

func (kv *KV) Prepare(method string, args proto.Request, reply proto.Response)

Prepare accepts a KV API call, specified by method name, arguments and a reply struct. The call will be buffered locally until the first call to Flush(), at which time it will be sent for execution as part of a batch call. Using Prepare/Flush parallelizes queries and updates and should be used where possible for efficiency.

For clients using an HTTP sender, Prepare/Flush allows multiple commands to be sent over the same connection. For transactional clients, Prepare/Flush can dramatically improve efficiency by compressing multiple writes into a single atomic update in the event that the writes are to keys within a single range. However, using Prepare/Flush alone will not guarantee atomicity. Clients must use a transaction for that purpose.

The supplied reply struct will not be valid until after a call to Flush().

Example

This is an example for using the Prepare() method to submit multiple Key Value API operations to be run in parallel. Flush() is then used to begin execution of all the prepared operations.

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"testing"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/proto"
	"github.com/cockroachdb/cockroach/rpc"
	"github.com/cockroachdb/cockroach/server"
	"github.com/cockroachdb/cockroach/storage"
	"github.com/cockroachdb/cockroach/util/log"
)

func StartTestServer(t *testing.T) *server.TestServer {
	s := &server.TestServer{}
	if err := s.Start(); err != nil {
		t.Fatalf("Could not start server: %v", err)
	}
	log.Infof("Test server listening on http: %s, rpc: %s", s.HTTPAddr, s.RPCAddr)
	return s
}

func main() {
	// Using built-in test server for this example code.
	serv := StartTestServer(nil)
	defer serv.Stop()

	// Replace with actual host:port address string (ex "localhost:8080") for server cluster.
	serverAddress := serv.HTTPAddr

	// Key Value Client initialization.
	sender := client.NewHTTPSender(serverAddress, &http.Transport{
		TLSClientConfig: rpc.LoadInsecureTLSConfig().Config(),
	})
	kvClient := client.NewKV(sender, nil)
	kvClient.User = storage.UserRoot
	defer kvClient.Close()

	// Insert test data.
	batchSize := 12
	keys := make([]string, batchSize)
	values := make([][]byte, batchSize)
	for i := 0; i < batchSize; i++ {
		keys[i] = fmt.Sprintf("key-%03d", i)
		values[i] = []byte(fmt.Sprintf("value-%03d", i))

		putReq := proto.PutArgs(proto.Key(keys[i]), values[i])
		putResp := &proto.PutResponse{}
		kvClient.Prepare(proto.Put, putReq, putResp)
	}

	// Flush all puts for parallel execution.
	if err := kvClient.Flush(); err != nil {
		log.Fatal(err)
	}

	// Scan for the newly inserted rows in parallel.
	numScans := 3
	rowsPerScan := batchSize / numScans
	scanResponses := make([]proto.ScanResponse, numScans)
	for i := 0; i < numScans; i++ {
		firstKey := proto.Key(keys[i*rowsPerScan])
		lastKey := proto.Key(keys[((i+1)*rowsPerScan)-1])
		kvClient.Prepare(proto.Scan, proto.ScanArgs(firstKey, lastKey.Next(), int64(rowsPerScan)), &scanResponses[i])
	}
	// Flush all scans for parallel execution.
	if err := kvClient.Flush(); err != nil {
		log.Fatal(err)
	}

	// Check results which may be returned out-of-order from creation.
	var matchCount int
	for i := 0; i < numScans; i++ {
		for _, keyVal := range scanResponses[i].Rows {
			currKey := keyVal.Key
			currValue := keyVal.Value.Bytes
			for j, origKey := range keys {
				if bytes.Equal(currKey, proto.Key(origKey)) && bytes.Equal(currValue, values[j]) {
					matchCount++
				}
			}
		}
	}
	if matchCount != batchSize {
		log.Fatal("Data mismatch.")
	}

	fmt.Println("Prepare Flush example done.")
}
Output:

Prepare Flush example done.

func (*KV) PreparePutProto

func (kv *KV) PreparePutProto(key proto.Key, msg gogoproto.Message) error

PreparePutProto sets the given key to the protobuf-serialized byte string of msg. The resulting Put call is buffered and will not be sent until a subsequent call to Flush. Returns marshalling errors if encountered.

func (*KV) PutI

func (kv *KV) PutI(key proto.Key, iface interface{}) error

PutI sets the given key to the gob-serialized byte string of value.

func (*KV) PutProto

func (kv *KV) PutProto(key proto.Key, msg gogoproto.Message) error

PutProto sets the given key to the protobuf-serialized byte string of msg.

func (*KV) RunTransaction

func (kv *KV) RunTransaction(opts *TransactionOptions, retryable func(txn *KV) error) error

RunTransaction executes retryable in the context of a distributed transaction. The transaction is automatically aborted if retryable returns any error aside from recoverable internal errors, and is automatically committed otherwise. retryable should have no side effects which could cause problems in the event it must be run more than once. The opts struct contains transaction settings.

Calling RunTransaction on the transactional KV client which is supplied to the retryable function is an error.

Example

This is an example for using the RunTransaction() method to submit multiple Key Value API operations inside a transaction.

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"testing"

	"github.com/cockroachdb/cockroach/client"
	"github.com/cockroachdb/cockroach/proto"
	"github.com/cockroachdb/cockroach/rpc"
	"github.com/cockroachdb/cockroach/server"
	"github.com/cockroachdb/cockroach/storage"
	"github.com/cockroachdb/cockroach/util/log"
)

func StartTestServer(t *testing.T) *server.TestServer {
	s := &server.TestServer{}
	if err := s.Start(); err != nil {
		t.Fatalf("Could not start server: %v", err)
	}
	log.Infof("Test server listening on http: %s, rpc: %s", s.HTTPAddr, s.RPCAddr)
	return s
}

func main() {
	// Using built-in test server for this example code.
	serv := StartTestServer(nil)
	defer serv.Stop()

	// Replace with actual host:port address string (ex "localhost:8080") for server cluster.
	serverAddress := serv.HTTPAddr

	// Key Value Client initialization.
	sender := client.NewHTTPSender(serverAddress, &http.Transport{
		TLSClientConfig: rpc.LoadInsecureTLSConfig().Config(),
	})
	kvClient := client.NewKV(sender, nil)
	kvClient.User = storage.UserRoot
	defer kvClient.Close()

	// Create test data.
	numKVPairs := 10
	keys := make([]string, numKVPairs)
	values := make([][]byte, numKVPairs)
	for i := 0; i < numKVPairs; i++ {
		keys[i] = fmt.Sprintf("testkey-%03d", i)
		values[i] = []byte(fmt.Sprintf("testvalue-%03d", i))
	}

	// Insert all KV pairs inside a transaction.
	putOpts := client.TransactionOptions{Name: "example put"}
	err := kvClient.RunTransaction(&putOpts, func(txn *client.KV) error {
		for i := 0; i < numKVPairs; i++ {
			txn.Prepare(proto.Put, proto.PutArgs(proto.Key(keys[i]), values[i]), &proto.PutResponse{})
		}
		// Note that the KV client is flushed automatically on transaction
		// commit. Invoking Flush after individual API methods is only
		// required if the result needs to be received to take conditional
		// action.
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}

	// Read back KV pairs inside a transaction.
	getResponses := make([]proto.GetResponse, numKVPairs)
	getOpts := client.TransactionOptions{Name: "example get"}
	err = kvClient.RunTransaction(&getOpts, func(txn *client.KV) error {
		for i := 0; i < numKVPairs; i++ {
			txn.Prepare(proto.Get, proto.GetArgs(proto.Key(keys[i])), &getResponses[i])
		}
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}

	// Check results.
	for i, getResp := range getResponses {
		if getResp.Value == nil {
			log.Fatal("No value returned for ", keys[i])
		} else {
			if !bytes.Equal(values[i], getResp.Value.Bytes) {
				log.Fatal("Data mismatch for ", keys[i], ", got: ", getResp.Value.Bytes)
			}
		}
	}

	fmt.Println("Transaction example done.")
}
Output:

Transaction example done.

func (*KV) Sender

func (kv *KV) Sender() KVSender

Sender returns the sender supplied to NewKV, unless wrapped by a transactional sender, in which case returns the unwrapped sender.

type KVSender

type KVSender interface {
	// Send invokes the Call.Method with Call.Args and sets the result
	// in Call.Reply.
	Send(*Call)
	// Close frees up resources in use by the sender.
	Close()
}

KVSender is an interface for sending a request to a Key-Value database backend.

type TransactionOptions

type TransactionOptions struct {
	Name      string // Concise desc of txn for debugging
	Isolation proto.IsolationType
}

TransactionOptions are parameters for use with KV.RunTransaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL