tarantool

package module
v0.0.0-...-13f719a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2020 License: MIT Imports: 27 Imported by: 0

README

go-tarantool GoDoc Build Status

The go-tarantool package has everything necessary for interfacing with Tarantool 1.6+.

The advantage of integrating Go with Tarantool, which is an application server plus a DBMS, is that Go programmers can handle databases with responses that are faster than other packages according to public benchmarks.

Table of contents

Key features

  • Support for both encoding and decoding of Tarantool queries/commands, which leads us to the following advantages:
    • implementing services that mimic a real Tarantool DBMS is relatively easy; for example, you can code a service which would relay queries and commands to a real Tarantool instance; the server interface is documented here;
    • replication support: you can implement a service which would mimic a Tarantool replication slave and get on-the-fly data updates from the Tarantool master, an example is provided here.
  • The interface for sending and packing queries is different from other go-tarantool implementations, which you may find more aesthetically pleasant to work with: all queries are represented with different types that follow the same interface rather than with individual methods in the connector, e.g. conn.Exec(&Update{...}) vs conn.Update({}).

Installation

Pre-requisites:

  • Tarantool version 1.6 or 1.7,
  • a modern Linux, BSD or Mac OS operating system,
  • a current version of go, version 1.8 or later (use go version to check the version number).

If your go version is older than 1.8, or if go is not installed, download the latest tarball from golang.org and say:

sudo tar -C /usr/local -xzf go1.8.3.linux-amd64.tar.gz
sudo chmod -R a+rwx /usr/local/go

Make sure go and go-tarantool are on your path. For example:

export PATH=$PATH:/usr/local/go/bin
export GOPATH="/usr/local/go/go-tarantool"

The go-tarantool package is in the viciious/go-tarantool repository. To download and install, say:

go get github.com/viciious/go-tarantool

This should bring source and binary files into subdirectories of /usr/local/go, making it possible to access by adding github.com/viciious/go-tarantool in the import {...} section at the start of any Go program.

Hello World

Here is a very short example Go program which tries to connect to a Tarantool server.

package main

import (
    "context"
    "fmt"
    "github.com/viciious/go-tarantool"
)

func main() {
    opts := tarantool.Options{User: "guest"}
    conn, err := tarantool.Connect("127.0.0.1:3301", &opts)
    if err != nil {
        fmt.Printf("Connection refused: %s\n", err.Error())
	return
    }

    query := &tarantool.Insert{Space: "examples", Tuple: []interface{}{uint64(99999), "BB"}}
    resp := conn.Exec(context.Background(), query)

    if resp.Error != nil {
        fmt.Println("Insert failed", resp.Error)
    } else {
        fmt.Println(fmt.Sprintf("Insert succeeded: %#v", resp.Data))
    }

    conn.Close()
}

Cut and paste this example into a file named example.go.

Start a Tarantool server on localhost, and make sure it is listening on port 3301. Set up a space named examples exactly as described in the Tarantool manual's Connectors section.

Again, make sure PATH and GOPATH point to the right places. Then build and run example.go:

go build example.go
./example

You should see: messages saying "Insert failed" or "Insert succeeded".

If that is what you see, then you have successfully installed go-tarantool and successfully executed a program that connected to a Tarantool server and manipulated the contents of a Tarantool database.

Walking through the example

We can now have a closer look at the example.go program and make some observations about what it does.

Observation 1: the line "github.com/viciious/go-tarantool" in the import(...) section brings in all Tarantool-related functions and structures. It is common to bring in context and fmt as well.

Observation 2: the line beginning with "Opts :=" sets up the options for Connect(). In this example, there is only one thing in the structure, a user name. The structure can also contain:

  • ConnectTimeout (the number of milliseconds the connector will wait a new connection to be established before giving up),
  • QueryTimeout (the default maximum number of milliseconds to wait before giving up - can be overriden on per-query basis),
  • DefaultSpace (the name of default Tarantool space)
  • Password (user's password)
  • UUID (used for replication)
  • ReplicaSetUUID (used for replication)

Observation 3: the line containing "tarantool.Connect" is one way to begin a session. There are two parameters:

  • a string with host:port format (or "/path/to/tarantool.socket"), and
  • the option structure that was set up earlier.

There is an alternative way to connect, we will describe it later.

Observation 4: the err structure will be nil if there is no error, otherwise it will have a description which can be retrieved with err.Error().

Observation 5: the conn.exec request, like many requests, is preceded by "conn." which is the name of the object that was returned by Connect(). In this case, for Insert, there are two parameters:

  • a space name (it could just as easily have been a space number), and
  • a tuple.

All the requests described in the Tarantool manual can be expressed in a similar way within connect.Exec(), with the format "&name-of-request{arguments}". For example: &ping{}. For a long example:

    data, err := conn.Exec(context.Background(), &Update{
        Space: "tester",
        Index: "primary",
        Key:   1,
        Set: []Operator{
            &OpAdd{
                Field:    2,
                Argument: 17,
            },
            &OpAssign{
                Field:    1,
                Argument: "Hello World",
            },
        },
    })

API reference

Read the Tarantool manual to find descriptions of terms like "connect", "space", "index", and the requests for creating and manipulating database objects or Lua functions.

The source files for the requests library are:

  • connection.go for the Connect() function plus functions related to connecting, and
  • insert_test.go for an example of a data-manipulation function used in tests.

See comments in these files for syntax details:

The supported requests have parameters and results equivalent to requests in the Tarantool manual. Browsing through the other *.go programs in the package will show how the packagers have paid attention to some of the more advanced features of Tarantool, such as vclock and replication.

Alternative way to connect

Here we show a variation of example.go, where the connect is done a different way.


package main

import (
    "context"
    "fmt"
    "github.com/viciious/go-tarantool"
)

func main() {
    opts := tarantool.Options{User: "guest"}
    tnt := tarantool.New("127.0.0.1:3301", &opts)
    conn, err := tnt.Connect()
    if err != nil {
        fmt.Printf("Connection refused: %s\n", err.Error())
	return
    }

    query := &tarantool.Insert{Space: "examples", Tuple: []interface{}{uint64(99999), "BB"}}
    resp := conn.Exec(context.Background(), query)

    if resp.Error != nil {
        fmt.Println("Insert failed", resp.Error)
    } else {
        fmt.Println(fmt.Sprintf("Insert succeeded: %#v", resp.Data))
    }

    conn.Close()
}

In this variation, tarantool.New returns a Connector instance, which is a goroutine-safe singleton object that can transparently handle reconnects.

Help

To contact go-tarantool developers on any problems, create an issue at viciious/go-tarantool.

The developers of the Tarantool server will also be happy to provide advice or receive feedback.

Documentation

Index

Examples

Constants

View Source
const (
	OKCommand        = uint(0)
	SelectCommand    = uint(1)
	InsertCommand    = uint(2)
	ReplaceCommand   = uint(3)
	UpdateCommand    = uint(4)
	DeleteCommand    = uint(5)
	CallCommand      = uint(6)
	AuthCommand      = uint(7)
	EvalCommand      = uint(8)
	UpsertCommand    = uint(9)
	Call17Command    = uint(10) // Tarantool >= 1.7.2
	PingCommand      = uint(64)
	JoinCommand      = uint(65)
	SubscribeCommand = uint(66)
	VoteCommand      = uint(68) // Tarantool >= 1.9.0
	ErrorFlag        = uint(0x8000)
)
View Source
const (
	KeyCode           = uint(0x00)
	KeySync           = uint(0x01)
	KeyInstanceID     = uint(0x02)
	KeyLSN            = uint(0x03)
	KeyTimestamp      = uint(0x04)
	KeySchemaID       = uint(0x05)
	KeySpaceNo        = uint(0x10)
	KeyIndexNo        = uint(0x11)
	KeyLimit          = uint(0x12)
	KeyOffset         = uint(0x13)
	KeyIterator       = uint(0x14)
	KeyKey            = uint(0x20)
	KeyTuple          = uint(0x21)
	KeyFunctionName   = uint(0x22)
	KeyUserName       = uint(0x23)
	KeyInstanceUUID   = uint(0x24)
	KeyReplicaSetUUID = uint(0x25)
	KeyVClock         = uint(0x26)
	KeyExpression     = uint(0x27)
	KeyDefTuple       = uint(0x28)
	KeyBallot         = uint(0x29) // Tarantool >= 1.9.0
	KeyData           = uint(0x30)
	KeyError          = uint(0x31)
)
View Source
const (
	// https://github.com/fl00r/go-tarantool-1.6/issues/2
	IterEq            = uint8(0) // key == x ASC order
	IterReq           = uint8(1) // key == x DESC order
	IterAll           = uint8(2) // all tuples
	IterLt            = uint8(3) // key < x
	IterLe            = uint8(4) // key <= x
	IterGe            = uint8(5) // key >= x
	IterGt            = uint8(6) // key > x
	IterBitsAllSet    = uint8(7) // all bits from x are set in key
	IterBitsAnySet    = uint8(8) // at least one x's bit is set
	IterBitsAllNotSet = uint8(9) // all bits are not set
)
View Source
const (
	SchemaKeyClusterUUID = "cluster"
	ReplicaSetMaxSize    = 32
	VClockMax            = ReplicaSetMaxSize
	UUIDStrLength        = 36
)
View Source
const (
	SpaceSchema    = uint(272)
	SpaceSpace     = uint(280)
	ViewSpace      = uint(281)
	SpaceIndex     = uint(288)
	ViewIndex      = uint(289)
	SpaceFunc      = uint(296)
	SpaceUser      = uint(304)
	SpacePriv      = uint(312)
	SpaceCluster   = uint(320)
	SpaceSystemMax = uint(511)
)
View Source
const (
	ErrUnknown                       = uint(0x00) // Unknown error
	ErrIllegalParams                 = uint(0x01) // Illegal parameters, %s
	ErrMemoryIssue                   = uint(0x02) // Failed to allocate %u bytes in %s for %s
	ErrTupleFound                    = uint(0x03) // Duplicate key exists in unique index '%s' in space '%s'
	ErrTupleNotFound                 = uint(0x04) // Tuple doesn't exist in index '%s' in space '%s'
	ErrUnsupported                   = uint(0x05) // %s does not support %s
	ErrNonmaster                     = uint(0x06) // Can't modify data on a replication slave. My master is: %s
	ErrReadonly                      = uint(0x07) // Can't modify data because this server is in read-only mode.
	ErrInjection                     = uint(0x08) // Error injection '%s'
	ErrCreateSpace                   = uint(0x09) // Failed to create space '%s': %s
	ErrSpaceExists                   = uint(0x0a) // Space '%s' already exists
	ErrDropSpace                     = uint(0x0b) // Can't drop space '%s': %s
	ErrAlterSpace                    = uint(0x0c) // Can't modify space '%s': %s
	ErrIndexType                     = uint(0x0d) // Unsupported index type supplied for index '%s' in space '%s'
	ErrModifyIndex                   = uint(0x0e) // Can't create or modify index '%s' in space '%s': %s
	ErrLastDrop                      = uint(0x0f) // Can't drop the primary key in a system space, space '%s'
	ErrTupleFormatLimit              = uint(0x10) // Tuple format limit reached: %u
	ErrDropPrimaryKey                = uint(0x11) // Can't drop primary key in space '%s' while secondary keys exist
	ErrKeyPartType                   = uint(0x12) // Supplied key type of part %u does not match index part type: expected %s
	ErrExactMatch                    = uint(0x13) // Invalid key part count in an exact match (expected %u, got %u)
	ErrInvalidMsgpack                = uint(0x14) // Invalid MsgPack - %s
	ErrProcRet                       = uint(0x15) // msgpack.encode: can not encode Lua type '%s'
	ErrTupleNotArray                 = uint(0x16) // Tuple/Key must be MsgPack array
	ErrFieldType                     = uint(0x17) // Tuple field %u type does not match one required by operation: expected %s
	ErrFieldTypeMismatch             = uint(0x18) // Ambiguous field type in index '%s', key part %u. Requested type is %s but the field has previously been defined as %s
	ErrSplice                        = uint(0x19) // SPLICE error on field %u: %s
	ErrArgType                       = uint(0x1a) // Argument type in operation '%c' on field %u does not match field type: expected a %s
	ErrTupleIsTooLong                = uint(0x1b) // Tuple is too long %u
	ErrUnknownUpdateOp               = uint(0x1c) // Unknown UPDATE operation
	ErrUpdateField                   = uint(0x1d) // Field %u UPDATE error: %s
	ErrFiberStack                    = uint(0x1e) // Can not create a new fiber: recursion limit reached
	ErrKeyPartCount                  = uint(0x1f) // Invalid key part count (expected [0..%u], got %u)
	ErrProcLua                       = uint(0x20) // %s
	ErrNoSuchProc                    = uint(0x21) // Procedure '%.*s' is not defined
	ErrNoSuchTrigger                 = uint(0x22) // Trigger is not found
	ErrNoSuchIndex                   = uint(0x23) // No index #%u is defined in space '%s'
	ErrNoSuchSpace                   = uint(0x24) // Space '%s' does not exist
	ErrNoSuchField                   = uint(0x25) // Field %d was not found in the tuple
	ErrSpaceFieldCount               = uint(0x26) // Tuple field count %u does not match space '%s' field count %u
	ErrIndexFieldCount               = uint(0x27) // Tuple field count %u is less than required by a defined index (expected %u)
	ErrWalIo                         = uint(0x28) // Failed to write to disk
	ErrMoreThanOneTuple              = uint(0x29) // More than one tuple found by get()
	ErrAccessDenied                  = uint(0x2a) // %s access denied for user '%s'
	ErrCreateUser                    = uint(0x2b) // Failed to create user '%s': %s
	ErrDropUser                      = uint(0x2c) // Failed to drop user '%s': %s
	ErrNoSuchUser                    = uint(0x2d) // User '%s' is not found
	ErrUserExists                    = uint(0x2e) // User '%s' already exists
	ErrPasswordMismatch              = uint(0x2f) // Incorrect password supplied for user '%s'
	ErrUnknownRequestType            = uint(0x30) // Unknown request type %u
	ErrUnknownSchemaObject           = uint(0x31) // Unknown object type '%s'
	ErrCreateFunction                = uint(0x32) // Failed to create function '%s': %s
	ErrNoSuchFunction                = uint(0x33) // Function '%s' does not exist
	ErrFunctionExists                = uint(0x34) // Function '%s' already exists
	ErrFunctionAccessDenied          = uint(0x35) // %s access denied for user '%s' to function '%s'
	ErrFunctionMax                   = uint(0x36) // A limit on the total number of functions has been reached: %u
	ErrSpaceAccessDenied             = uint(0x37) // %s access denied for user '%s' to space '%s'
	ErrUserMax                       = uint(0x38) // A limit on the total number of users has been reached: %u
	ErrNoSuchEngine                  = uint(0x39) // Space engine '%s' does not exist
	ErrReloadCfg                     = uint(0x3a) // Can't set option '%s' dynamically
	ErrCfg                           = uint(0x3b) // Incorrect value for option '%s': %s
	ErrSophia                        = uint(0x3c) // %s
	ErrLocalServerIsNotActive        = uint(0x3d) // Local server is not active
	ErrUnknownServer                 = uint(0x3e) // Server %s is not registered with the cluster
	ErrClusterIDMismatch             = uint(0x3f) // Cluster id of the replica %s doesn't match cluster id of the master %s
	ErrInvalidUUID                   = uint(0x40) // Invalid UUID: %s
	ErrClusterIDIsRo                 = uint(0x41) // Can't reset cluster id: it is already assigned
	ErrReserved66                    = uint(0x42) // Reserved66
	ErrServerIDIsReserved            = uint(0x43) // Can't initialize server id with a reserved value %u
	ErrInvalidOrder                  = uint(0x44) // Invalid LSN order for server %u: previous LSN = %llu, new lsn = %llu
	ErrMissingRequestField           = uint(0x45) // Missing mandatory field '%s' in request
	ErrIdentifier                    = uint(0x46) // Invalid identifier '%s' (expected letters, digits or an underscore)
	ErrDropFunction                  = uint(0x47) // Can't drop function %u: %s
	ErrIteratorType                  = uint(0x48) // Unknown iterator type '%s'
	ErrReplicaMax                    = uint(0x49) // Replica count limit reached: %u
	ErrInvalidXlog                   = uint(0x4a) // Failed to read xlog: %lld
	ErrInvalidXlogName               = uint(0x4b) // Invalid xlog name: expected %lld got %lld
	ErrInvalidXlogOrder              = uint(0x4c) // Invalid xlog order: %lld and %lld
	ErrNoConnection                  = uint(0x4d) // Connection is not established
	ErrTimeout                       = uint(0x4e) // Timeout exceeded
	ErrActiveTransaction             = uint(0x4f) // Operation is not permitted when there is an active transaction
	ErrNoActiveTransaction           = uint(0x50) // Operation is not permitted when there is no active transaction
	ErrCrossEngineTransaction        = uint(0x51) // A multi-statement transaction can not use multiple storage engines
	ErrNoSuchRole                    = uint(0x52) // Role '%s' is not found
	ErrRoleExists                    = uint(0x53) // Role '%s' already exists
	ErrCreateRole                    = uint(0x54) // Failed to create role '%s': %s
	ErrIndexExists                   = uint(0x55) // Index '%s' already exists
	ErrTupleRefOverflow              = uint(0x56) // Tuple reference counter overflow
	ErrRoleLoop                      = uint(0x57) // Granting role '%s' to role '%s' would create a loop
	ErrGrant                         = uint(0x58) // Incorrect grant arguments: %s
	ErrPrivGranted                   = uint(0x59) // User '%s' already has %s access on %s '%s'
	ErrRoleGranted                   = uint(0x5a) // User '%s' already has role '%s'
	ErrPrivNotGranted                = uint(0x5b) // User '%s' does not have %s access on %s '%s'
	ErrRoleNotGranted                = uint(0x5c) // User '%s' does not have role '%s'
	ErrMissingSnapshot               = uint(0x5d) // Can't find snapshot
	ErrCantUpdatePrimaryKey          = uint(0x5e) // Attempt to modify a tuple field which is part of index '%s' in space '%s'
	ErrUpdateIntegerOverflow         = uint(0x5f) // Integer overflow when performing '%c' operation on field %u
	ErrGuestUserPassword             = uint(0x60) // Setting password for guest user has no effect
	ErrTransactionConflict           = uint(0x61) // Transaction has been aborted by conflict
	ErrUnsupportedRolePriv           = uint(0x62) // Unsupported role privilege '%s'
	ErrLoadFunction                  = uint(0x63) // Failed to dynamically load function '%s': %s
	ErrFunctionLanguage              = uint(0x64) // Unsupported language '%s' specified for function '%s'
	ErrRtreeRect                     = uint(0x65) // RTree: %s must be an array with %u (point) or %u (rectangle/box) numeric coordinates
	ErrProcC                         = uint(0x66) // ???
	ErrUnknownRtreeIndexDistanceType = uint(0x67) //Unknown RTREE index distance type %s
	ErrProtocol                      = uint(0x68) // %s
	ErrUpsertUniqueSecondaryKey      = uint(0x69) // Space %s has a unique secondary index and does not support UPSERT
	ErrWrongIndexRecord              = uint(0x6a) // Wrong record in _index space: got {%s}, expected {%s}
	ErrWrongIndexParts               = uint(0x6b) // Wrong index parts (field %u): %s; expected field1 id (number), field1 type (string), ...
	ErrWrongIndexOptions             = uint(0x6c) // Wrong index options (field %u): %s
	ErrWrongSchemaVaersion           = uint(0x6d) // Wrong schema version, current: %d, in request: %u
	ErrSlabAllocMax                  = uint(0x6e) // Failed to allocate %u bytes for tuple in the slab allocator: tuple is too large. Check 'slab_alloc_maximal' configuration option.
)

Tarantool server error codes

View Source
const (
	DefaultIndex = "primary"
	DefaultLimit = 100

	DefaultConnectTimeout = time.Second
	DefaultQueryTimeout   = time.Second
)
View Source
const (
	GreetingSize = 128
)
View Source
const (
	ServerIdent = "Tarantool 1.6.8 (Binary)"
)

Variables

View Source
var (
	ErrInvalidGreeting   = errors.New("invalid greeting")
	ErrEmptyDefaultSpace = errors.New("zero-length default space or unnecessary slash in dsn.path")
	ErrSyncFailed        = errors.New("SYNC failed")
)
View Source
var (
	DefaultReaderBufSize = 128 * 1024
	DefaultWriterBufSize = 4 * 1024
)
View Source
var (
	// ErrNotSupported is returned when an unimplemented query type or operation is encountered.
	ErrNotSupported = NewQueryError(ErrUnsupported, "not supported yet")
	// ErrNotInReplicaSet means that join operation can not be performed on a replica set due to missing parameters.
	ErrNotInReplicaSet = NewQueryError(0, "Full Replica Set params hasn't been set")
	// ErrBadResult means that query result was of invalid type or length.
	ErrBadResult = NewQueryError(0, "invalid result")
	// ErrVectorClock is returns in case of bad manipulation with vector clock.
	ErrVectorClock = NewQueryError(0, "vclock manipulation")
	// ErrUnknownError is returns when ErrorCode isn't OK but Error is nil in Result.
	ErrUnknownError = NewQueryError(ErrUnknown, "unknown error")
)
View Source
var (
	ErrPortAlreadyInUse = errors.New("Port already in use")
)

Functions

func VersionID

func VersionID(major, minor, patch uint32) uint32

Types

type AsyncResult

type AsyncResult struct {
	ErrorCode    uint
	Error        error
	BinaryPacket *BinaryPacket
	Connection   *Connection
	Opaque       interface{}
}

type Auth

type Auth struct {
	User         string
	Password     string
	GreetingAuth []byte
}

func (*Auth) GetCommandID

func (auth *Auth) GetCommandID() uint

func (*Auth) MarshalMsg

func (auth *Auth) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Auth) UnmarshalMsg

func (auth *Auth) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type BinaryPacket

type BinaryPacket struct {
	// contains filtered or unexported fields
}

func (*BinaryPacket) Bytes

func (pp *BinaryPacket) Bytes() []byte

func (*BinaryPacket) ReadFrom

func (pp *BinaryPacket) ReadFrom(r io.Reader) (n int64, err error)

ReadFrom implements the io.ReaderFrom interface

func (*BinaryPacket) Release

func (pp *BinaryPacket) Release()

func (*BinaryPacket) Reset

func (pp *BinaryPacket) Reset()

func (*BinaryPacket) Result

func (pp *BinaryPacket) Result() *Result

func (*BinaryPacket) Unmarshal

func (pp *BinaryPacket) Unmarshal() error

func (*BinaryPacket) UnmarshalCustomBody

func (pp *BinaryPacket) UnmarshalCustomBody(um UnmarshalBinaryBodyFunc) (err error)

func (*BinaryPacket) WriteTo

func (pp *BinaryPacket) WriteTo(w io.Writer) (n int64, err error)

WriteTo implements the io.WriterTo interface

type BinaryPacketPool

type BinaryPacketPool struct {
	// contains filtered or unexported fields
}

func (*BinaryPacketPool) Close

func (p *BinaryPacketPool) Close()

func (*BinaryPacketPool) Get

func (p *BinaryPacketPool) Get() *BinaryPacket

func (*BinaryPacketPool) GetWithID

func (p *BinaryPacketPool) GetWithID(requestID uint64) (pp *BinaryPacket)

func (*BinaryPacketPool) Put

func (p *BinaryPacketPool) Put(pp *BinaryPacket)

type Box

type Box struct {
	Root    string
	WorkDir string
	Port    uint
	Listen  string
	// contains filtered or unexported fields
}

Box is tarantool instance. For start/stop tarantool in tests

func NewBox

func NewBox(config string, options *BoxOptions) (*Box, error)

func (*Box) Addr

func (box *Box) Addr() string

func (*Box) Close

func (box *Box) Close()

func (*Box) Connect

func (box *Box) Connect(options *Options) (*Connection, error)

func (*Box) IsStopped

func (box *Box) IsStopped() bool

func (*Box) Start

func (box *Box) Start() error

func (*Box) StartWithLua

func (box *Box) StartWithLua(luaTransform func(string) string) error

func (*Box) Stop

func (box *Box) Stop()

func (*Box) Version

func (box *Box) Version() (string, error)

type BoxOptions

type BoxOptions struct {
	Host    string
	Port    uint
	PortMin uint
	PortMax uint
	WorkDir string

	LogDir        string
	LogNamePrefix string
}

type Bytes

type Bytes []byte

type Call

type Call struct {
	Name  string
	Tuple []interface{}
}

func (*Call) GetCommandID

func (q *Call) GetCommandID() uint

func (*Call) MarshalMsg

func (q *Call) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Call) UnmarshalMsg

func (q *Call) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Call17

type Call17 struct {
	Name  string
	Tuple []interface{}
}

Call17 is available since Tarantool >= 1.7.2

func (*Call17) GetCommandID

func (q *Call17) GetCommandID() uint

func (*Call17) MarshalMsg

func (q *Call17) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Call17) UnmarshalMsg

func (q *Call17) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(dsnString string, options *Options) (conn *Connection, err error)

Connect to tarantool instance with options

func ConnectContext

func ConnectContext(ctx context.Context, dsnString string, options *Options) (conn *Connection, err error)

Connect to tarantool instance with options using the provided context. Returned Connection can be used to execute queries.

func (*Connection) Close

func (conn *Connection) Close()

func (*Connection) Exec

func (conn *Connection) Exec(ctx context.Context, q Query, options ...ExecOption) (result *Result)

func (*Connection) ExecAsync

func (conn *Connection) ExecAsync(ctx context.Context, q Query, opaque interface{}, replyChan chan *AsyncResult) error

func (*Connection) Execute

func (conn *Connection) Execute(q Query) ([][]interface{}, error)

func (*Connection) GetPerf

func (conn *Connection) GetPerf() PerfCount

func (*Connection) GetPrimaryKeyFields

func (conn *Connection) GetPrimaryKeyFields(space interface{}) ([]int, bool)

func (*Connection) IsClosed

func (conn *Connection) IsClosed() bool

func (*Connection) String

func (conn *Connection) String() string

type ConnectionError

type ConnectionError struct {
	// contains filtered or unexported fields
}

ConnectionError is returned when something have been happened with connection.

func ConnectionClosedError

func ConnectionClosedError(con *Connection) *ConnectionError

ConnectionClosedError returns ConnectionError with message about closed connection or error depending on the connection state. It is also has remoteAddr in error text.

func NewConnectionError

func NewConnectionError(con *Connection, message string) *ConnectionError

NewConnectionError returns ConnectionError with message and remoteAddr in error text.

func (*ConnectionError) Temporary

func (e *ConnectionError) Temporary() bool

Temporary implements Error interface.

func (*ConnectionError) Timeout

func (e *ConnectionError) Timeout() bool

Timeout implements net.Error interface.

type Connector

type Connector struct {
	sync.Mutex
	RemoteAddr string
	// contains filtered or unexported fields
}

func New

func New(dsnString string, options *Options) *Connector

New Connector instance.

func (*Connector) Close

func (c *Connector) Close()

Close underlying connection.

func (*Connector) Connect

func (c *Connector) Connect() (conn *Connection, err error)

Connect returns existing connection or will establish another one.

func (*Connector) ConnectContext

func (c *Connector) ConnectContext(ctx context.Context) (conn *Connection, err error)

Connect returns existing connection or will establish another one using the provided context.

type ContextError

type ContextError struct {
	CtxErr error
	// contains filtered or unexported fields
}

ContextError is returned when request has been ended with context timeout or cancel.

func NewContextError

func NewContextError(ctx context.Context, con *Connection, message string) *ContextError

NewContextError returns ContextError with message and remoteAddr in error text. It is also has context error itself in CtxErr.

func (*ContextError) Temporary

func (e *ContextError) Temporary() bool

Temporary implements Error interface.

func (*ContextError) Timeout

func (e *ContextError) Timeout() bool

Timeout implements net.Error interface.

type CountedReader

type CountedReader struct {
	// contains filtered or unexported fields
}

func NewCountedReader

func NewCountedReader(r io.Reader, c *expvar.Int) *CountedReader

func (*CountedReader) Read

func (cr *CountedReader) Read(p []byte) (int, error)

type CountedWriter

type CountedWriter struct {
	// contains filtered or unexported fields
}

func NewCountedWriter

func NewCountedWriter(w io.Writer, c *expvar.Int) *CountedWriter

func (*CountedWriter) Write

func (cw *CountedWriter) Write(p []byte) (int, error)

type Delete

type Delete struct {
	Space    interface{}
	Index    interface{}
	Key      interface{}
	KeyTuple []interface{}
}

func (*Delete) GetCommandID

func (q *Delete) GetCommandID() uint

func (*Delete) MarshalMsg

func (q *Delete) MarshalMsg(b []byte) (data []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Delete) UnmarshalMsg

func (q *Delete) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Error

type Error interface {
	error
	Temporary() bool // Temporary true if the error is temporary
}

Error has Temporary method which returns true if error is temporary. It is useful to quickly decide retry or not retry.

type Eval

type Eval struct {
	Expression string
	Tuple      []interface{}
}

Eval query

func (*Eval) GetCommandID

func (q *Eval) GetCommandID() uint

func (*Eval) MarshalMsg

func (q *Eval) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Eval) UnmarshalMsg

func (q *Eval) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type ExecOption

type ExecOption interface {
	// contains filtered or unexported methods
}

func OpaqueExecOption

func OpaqueExecOption(opaque interface{}) ExecOption

type Greeting

type Greeting struct {
	Version uint32
	Auth    []byte
}

type Insert

type Insert struct {
	Space interface{}
	Tuple []interface{}
}

func (*Insert) GetCommandID

func (q *Insert) GetCommandID() uint

func (*Insert) MarshalMsg

func (q *Insert) MarshalMsg(b []byte) (data []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Insert) UnmarshalMsg

func (q *Insert) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type IprotoServer

type IprotoServer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewIprotoServer

func NewIprotoServer(uuid string, handler QueryHandler, onShutdown OnShutdownCallback) *IprotoServer

func (*IprotoServer) Accept

func (s *IprotoServer) Accept(conn net.Conn)

func (*IprotoServer) CheckAuth

func (s *IprotoServer) CheckAuth(hash []byte, password string) bool

func (*IprotoServer) Shutdown

func (s *IprotoServer) Shutdown() error

func (*IprotoServer) WithOptions

func (s *IprotoServer) WithOptions(opts *IprotoServerOptions) *IprotoServer

type IprotoServerOptions

type IprotoServerOptions struct {
	Perf PerfCount
}

type Iterator

type Iterator struct {
	Iter uint8
}

func (Iterator) String

func (it Iterator) String() string

type Join

type Join struct {
	UUID string
}

Join is the JOIN command

func (*Join) GetCommandID

func (q *Join) GetCommandID() uint

func (*Join) MarshalMsg

func (q *Join) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Join) UnmarshalMsg

func (q *Join) UnmarshalMsg([]byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type OnShutdownCallback

type OnShutdownCallback func(err error)

type OpAdd

type OpAdd struct {
	Field    uint64
	Argument int64
}

func (*OpAdd) AsTuple

func (op *OpAdd) AsTuple() []interface{}

type OpAssign

type OpAssign struct {
	Field    uint64
	Argument interface{}
}

func (*OpAssign) AsTuple

func (op *OpAssign) AsTuple() []interface{}

type OpBitAND

type OpBitAND struct {
	Field    uint64
	Argument uint64
}

func (*OpBitAND) AsTuple

func (op *OpBitAND) AsTuple() []interface{}

type OpBitOR

type OpBitOR struct {
	Field    uint64
	Argument uint64
}

func (*OpBitOR) AsTuple

func (op *OpBitOR) AsTuple() []interface{}

type OpBitXOR

type OpBitXOR struct {
	Field    uint64
	Argument uint64
}

func (*OpBitXOR) AsTuple

func (op *OpBitXOR) AsTuple() []interface{}

type OpDelete

type OpDelete struct {
	From  uint64
	Count uint64
}

func (*OpDelete) AsTuple

func (op *OpDelete) AsTuple() []interface{}

type OpInsert

type OpInsert struct {
	Before   uint64
	Argument interface{}
}

func (*OpInsert) AsTuple

func (op *OpInsert) AsTuple() []interface{}

type OpSplice

type OpSplice struct {
	Field    uint64
	Offset   uint64
	Position uint64
	Argument string
}

func (*OpSplice) AsTuple

func (op *OpSplice) AsTuple() []interface{}

type OpSub

type OpSub struct {
	Field    uint64
	Argument int64
}

func (*OpSub) AsTuple

func (op *OpSub) AsTuple() []interface{}

type Operator

type Operator interface {
	AsTuple() []interface{}
}

type Options

type Options struct {
	ConnectTimeout time.Duration
	QueryTimeout   time.Duration
	DefaultSpace   string
	User           string
	Password       string
	UUID           string
	ReplicaSetUUID string
	Perf           PerfCount

	// PoolMaxPacketSize describes maximum size of packet buffer
	// that can be added to packet pool.
	// If the packet size is 0, option is ignored.
	PoolMaxPacketSize int
}

type Packet

type Packet struct {
	Cmd uint
	LSN uint64

	InstanceID uint32
	Timestamp  time.Time
	Request    Query
	Result     *Result
	// contains filtered or unexported fields
}

func (*Packet) String

func (pack *Packet) String() string

func (*Packet) UnmarshalBinary

func (pack *Packet) UnmarshalBinary(data []byte) error

UnmarshalBinary implements encoding.BinaryUnmarshaler

func (*Packet) UnmarshalBinaryBody

func (pack *Packet) UnmarshalBinaryBody(data []byte) (buf []byte, err error)

func (*Packet) UnmarshalBinaryHeader

func (pack *Packet) UnmarshalBinaryHeader(data []byte) (buf []byte, err error)

func (*Packet) UnmarshalMsg

func (pack *Packet) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type PacketIterator

type PacketIterator interface {
	Next() (*Packet, error)
}

PacketIterator is a wrapper around Slave provided iteration over new Packets functionality.

type PerfCount

type PerfCount struct {
	NetRead       *expvar.Int
	NetWrite      *expvar.Int
	NetPacketsIn  *expvar.Int
	NetPacketsOut *expvar.Int
	QueryTimeouts *expvar.Int
	QueryComplete QueryCompleteFn
}

type Ping

type Ping struct {
}

func (*Ping) GetCommandID

func (q *Ping) GetCommandID() uint

func (*Ping) MarshalMsg

func (q *Ping) MarshalMsg(b []byte) ([]byte, error)

MarshalMsg implements msgp.Marshaler

func (*Ping) UnmarshalMsg

func (q *Ping) UnmarshalMsg([]byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Query

type Query interface {
	GetCommandID() uint
}

func NewQuery

func NewQuery(cmd uint) Query

type QueryCompleteFn

type QueryCompleteFn func(interface{}, time.Duration)

type QueryError

type QueryError struct {
	Code uint
	// contains filtered or unexported fields
}

QueryError is returned when query error has been happened. It has error Code.

func NewQueryError

func NewQueryError(code uint, message string) *QueryError

NewQueryError returns ContextError with message and Code.

func (*QueryError) Temporary

func (e *QueryError) Temporary() bool

Temporary implements Error interface.

func (*QueryError) Timeout

func (e *QueryError) Timeout() bool

Timeout implements net.Error interface.

type QueryHandler

type QueryHandler func(queryContext context.Context, query Query) *Result

type Replace

type Replace struct {
	Space interface{}
	Tuple []interface{}
}

func (*Replace) GetCommandID

func (q *Replace) GetCommandID() uint

func (*Replace) MarshalMsg

func (q *Replace) MarshalMsg(b []byte) ([]byte, error)

MarshalMsg implements msgp.Marshaler

func (*Replace) UnmarshalMsg

func (q *Replace) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaller

type ReplicaSet

type ReplicaSet struct {
	UUID      string
	Instances []string // Instances is read-only set of the instances uuid
}

ReplicaSet is used to store params of the Replica Set.

func NewReplicaSet

func NewReplicaSet() ReplicaSet

NewReplicaSet returns empty ReplicaSet.

func (*ReplicaSet) Has

func (rs *ReplicaSet) Has(id uint32) bool

Has ReplicaSet specified instance?

func (*ReplicaSet) SetInstance

func (rs *ReplicaSet) SetInstance(id uint32, uuid string) bool

SetInstance uuid in instance set.

type Result

type Result struct {
	ErrorCode uint
	Error     error
	Data      [][]interface{}
}

func (*Result) GetCommandID

func (r *Result) GetCommandID() uint

func (*Result) MarshalMsg

func (r *Result) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Result) String

func (r *Result) String() string

func (*Result) UnmarshalMsg

func (r *Result) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Select

type Select struct {
	Space    interface{}
	Index    interface{}
	Offset   uint32
	Limit    uint32
	Iterator uint8
	Key      interface{}
	KeyTuple []interface{}
}

func (*Select) GetCommandID

func (q *Select) GetCommandID() uint

func (*Select) MarshalMsg

func (q *Select) MarshalMsg(b []byte) (data []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Select) UnmarshalMsg

func (q *Select) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Slave

type Slave struct {
	UUID       string
	VClock     VectorClock
	ReplicaSet ReplicaSet
	// contains filtered or unexported fields
}

Slave connects to Tarantool 1.6, 1.7 or 1.10 instance and subscribes for changes. Tarantool instance acting as a master sees Slave like any replica in replication set. Slave can't be used concurrently, route responses from returned channel instead.

Example (SubscribeExisted)
package main

import (
	"log"
	"strings"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Subscribe for master's changes synchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("127.0.0.1:8000", tnt16.Options{
		User:     "username",
		Password: "password",
		// UUID of the instance in replica set. Required
		UUID: "7c025e42-2394-11e7-aacf-0242ac110002",
		// UUID of the Replica Set. Required
		ReplicaSetUUID: "3b39c6a4-f2da-4d81-a43b-103e5b1c16a1"})
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// let's start from the beginning
	var lsn uint64 = 0
	it, err := s.Subscribe(lsn)
	if err != nil {
		log.Printf("Tnt Slave subscribing error:%v", err)
		return
	}

	// print snapshot
	var p *tnt16.Packet
	var hr = strings.Repeat("-", 80)
	// iterate over master's changes permanently
	for {
		p, err = it.Next()
		if err != nil {
			log.Printf("Tnt Slave iterating error:%v", err)
			return
		}
		log.Println(p)
		log.Println(hr)
	}
}
Output:

Example (SubscribeNew)
package main

import (
	"log"
	"strings"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Silently join slave to Replica Set and consume master's changes synchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("username:password@127.0.0.1:8000")
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// let's start from the beginning
	it, err := s.Attach()
	if err != nil {
		log.Printf("Tnt Slave subscribing error:%v", err)
		return
	}

	// print snapshot
	var p *tnt16.Packet
	var hr = strings.Repeat("-", 80)
	// iterate over master's changes permanently
	for {
		p, err = it.Next()
		if err != nil {
			log.Printf("Tnt Slave iterating error:%v", err)
			return
		}
		log.Println(p)
		log.Println(hr)
	}
}
Output:

func NewSlave

func NewSlave(uri string, opts ...Options) (s *Slave, err error)

NewSlave instance with tarantool master uri. URI is parsed by url package and therefore should contains any scheme supported by net.Dial.

func (*Slave) Attach

func (s *Slave) Attach(out ...chan *Packet) (it PacketIterator, err error)

Attach Slave to Replica Set and subscribe for the new(!) DML requests. Use out chan for asynchronous packet receiving or synchronous PacketIterator otherwise. If you need all requests in chan use JoinWithSnap(chan) and then s.Subscribe(s.VClock[1:]...).

Example (Async)
package main

import (
	"log"
	"strings"
	"sync"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Silently join slave to Replica Set and consume master's changes asynchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("username:password@127.0.0.1:8000")
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// chan for snapshot's packets
	xlogChan := make(chan *tnt16.Packet, 128)
	wg := &sync.WaitGroup{}

	// run xlog printer before subscribing command
	wg.Add(1)
	go func(in <-chan *tnt16.Packet, wg *sync.WaitGroup) {
		defer wg.Done()

		var hr = strings.Repeat("-", 80)

		for p := range in {
			log.Println(hr)
			switch q := p.Request.(type) {
			case *tnt16.Insert:
				switch q.Space {
				case tnt16.SpaceIndex, tnt16.SpaceSpace:
					// short default format
					log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n",
						p.LSN, q.Space, p.InstanceID)
				default:
					log.Printf("%v", p)
				}
			default:
				log.Printf("%v", p)
			}
		}
	}(xlogChan, wg)

	// let's start from the beginning
	_, err = s.Attach(xlogChan)
	if err != nil {
		log.Printf("Tnt Slave subscribing error:%v", err)
		return
	}

	// consume master's changes permanently
	wg.Wait()
}
Output:

Example (Sync)
package main

import (
	"log"
	"strings"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Silently join slave to Replica Set and consume master's changes synchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("username:password@127.0.0.1:8000")
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// let's start from the beginning
	it, err := s.Attach()
	if err != nil {
		log.Printf("Tnt Slave subscribing error:%v", err)
		return
	}

	// print snapshot
	var p *tnt16.Packet
	var hr = strings.Repeat("-", 80)
	// consume master's changes permanently
	for {
		p, err = it.Next()
		if err != nil {
			log.Printf("Tnt Slave consuming error:%v", err)
			return
		}
		log.Println(hr)
		switch q := p.Request.(type) {
		case *tnt16.Insert:
			switch q.Space {
			case tnt16.SpaceIndex, tnt16.SpaceSpace:
				// short default format
				log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n",
					p.LSN, q.Space, p.InstanceID)
			default:
				log.Printf("%v", p)
			}
		default:
			log.Printf("%v", p)
		}
	}
}
Output:

func (*Slave) Close

func (s *Slave) Close() error

Close Slave connection to Master.

func (*Slave) Err

func (s *Slave) Err() error

Err has been got by HasNext method.

func (*Slave) HasNext

func (s *Slave) HasNext() bool

HasNext implements bufio.Scanner Scan style iterator.

func (*Slave) IsInReplicaSet

func (s *Slave) IsInReplicaSet() bool

IsInReplicaSet checks whether Slave has Replica Set params or not.

func (*Slave) Join

func (s *Slave) Join() (err error)

Join the Replica Set using Master instance.

Example
package main

import (
	"log"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Silently join slave to Replica Set

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("username:password@127.0.0.1:8000")
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	if err = s.Join(); err != nil {
		log.Printf("Tnt Slave joining error:%v", err)
		return
	}

	log.Printf("UUID=%#v Replica Set UUID=%#v\n", s.UUID, s.ReplicaSet.UUID)
}
Output:

func (*Slave) JoinWithSnap

func (s *Slave) JoinWithSnap(out ...chan *Packet) (it PacketIterator, err error)

JoinWithSnap the Replica Set using Master instance. Snapshot logs is available through the given out channel or returned PacketIterator. (In truth, Slave itself is returned in PacketIterator wrapper)

Example (Async)
package main

import (
	"log"
	"strings"
	"sync"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Join slave to Replica Set with iterating snapshot asynchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("username:password@127.0.0.1:8000")
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// chan for snapshot's packets
	snapChan := make(chan *tnt16.Packet, 128)
	wg := &sync.WaitGroup{}

	// run snapshot printer before join command
	wg.Add(1)
	go func(in <-chan *tnt16.Packet, wg *sync.WaitGroup) {
		defer wg.Done()

		var hr = strings.Repeat("-", 80)

		for p := range in {
			log.Println(hr)
			switch q := p.Request.(type) {
			case *tnt16.Insert:
				switch q.Space {
				case tnt16.SpaceIndex, tnt16.SpaceSpace:
					// short default format
					log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n",
						p.LSN, q.Space, p.InstanceID)
				default:
					log.Printf("%v", p)
				}
			default:
				log.Printf("%v", p)
			}
		}
	}(snapChan, wg)

	_, err = s.JoinWithSnap(snapChan)
	if err != nil {
		log.Printf("Tnt Slave joining error:%v", err)
		return
	}

	wg.Wait()

	log.Printf("UUID=%#v Replica Set UUID=%#v\n", s.UUID, s.ReplicaSet.UUID)
}
Output:

Example (Sync)
package main

import (
	"log"
	"strings"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Join slave to Replica Set with iterating snapshot synchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("username:password@127.0.0.1:8000")
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// skip returned iterator; will be using self bufio.scanner-style iterator instead
	_, err = s.JoinWithSnap()
	if err != nil {
		log.Printf("Tnt Slave joining error:%v", err)
		return
	}

	// print snapshot
	var p *tnt16.Packet
	var hr = strings.Repeat("-", 80)
	for s.HasNext() {
		p = s.Packet()
		// print request
		log.Println(hr)
		switch q := p.Request.(type) {
		case *tnt16.Insert:
			switch q.Space {
			case tnt16.SpaceIndex, tnt16.SpaceSpace:
				// short default format
				log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n",
					p.LSN, q.Space, p.InstanceID)
			default:
				log.Printf("%v", p)
			}
		default:
			log.Printf("%v", p)
		}
	}
	// always checks for errors after iteration cycle
	if s.Err() != nil {
		log.Printf("Tnt Slave joining error:%v", err)
		return
	}

	log.Printf("UUID=%#v Replica Set UUID=%#v\n", s.UUID, s.ReplicaSet.UUID)
}
Output:

func (*Slave) LastSnapVClock

func (s *Slave) LastSnapVClock() (VectorClock, error)

func (*Slave) Next

func (s *Slave) Next() (*Packet, error)

Next implements PacketIterator interface.

func (*Slave) Packet

func (s *Slave) Packet() *Packet

Packet has been got by HasNext method.

func (*Slave) Subscribe

func (s *Slave) Subscribe(lsns ...uint64) (it PacketIterator, err error)

Subscribe for DML requests (insert, update, delete, replace, upsert) since vector clock. Variadic lsn is start vector clock. Each lsn is one clock in vector (sequentially). One lsn is enough for master-slave replica set. Replica Set and self UUID should be set before call subscribe. Use options in New or Join for it. Subscribe sends requests asynchronously to out channel specified or use synchronous PacketIterator otherwise.

Example (Async)
package main

import (
	"log"
	"strings"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Subscribe for master's changes asynchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("127.0.0.1:8000", tnt16.Options{
		User:     "username",
		Password: "password",
		// UUID of the instance in replica set. Required
		UUID: "7c025e42-2394-11e7-aacf-0242ac110002",
		// UUID of the Replica Set. Required
		ReplicaSetUUID: "3b39c6a4-f2da-4d81-a43b-103e5b1c16a1"})
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// chan for snapshot's packets
	xlogChan := make(chan *tnt16.Packet, 128)

	// run xlog printer before subscribing command
	go func(in <-chan *tnt16.Packet) {
		var hr = strings.Repeat("-", 80)

		for p := range in {
			log.Println(hr)
			switch q := p.Request.(type) {
			case *tnt16.Insert:
				switch q.Space {
				case tnt16.SpaceIndex, tnt16.SpaceSpace:
					// short default format
					log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n",
						p.LSN, q.Space, p.InstanceID)
				default:
					log.Printf("%v", p)
				}
			default:
				log.Printf("%v", p)
			}
		}
	}(xlogChan)

	// let's start from the beginning
	var lsn uint64 = 0
	it, err := s.Subscribe(lsn)
	if err != nil {
		log.Printf("Tnt Slave subscribing error:%v", err)
		return
	}

	// consume requests infinitely
	var p *tnt16.Packet
	for {
		p, err = it.Next()
		if err != nil {
			close(xlogChan)
			log.Printf("Tnt Slave consuming error:%v", err)
			return
		}
		xlogChan <- p
	}
}
Output:

Example (Sync)
package main

import (
	"log"
	"strings"

	tnt16 "github.com/viciious/go-tarantool"
)

func main() {
	// Subscribe for master's changes synchronously

	// new slave instance connects to provided dsn instantly
	s, err := tnt16.NewSlave("127.0.0.1:8000", tnt16.Options{
		User:     "username",
		Password: "password",
		// UUID of the instance in replica set. Required
		UUID: "7c025e42-2394-11e7-aacf-0242ac110002",
		// UUID of the Replica Set. Required
		ReplicaSetUUID: "3b39c6a4-f2da-4d81-a43b-103e5b1c16a1"})
	if err != nil {
		log.Printf("Tnt Slave creating error:%v", err)
		return
	}
	// always close slave to preserve socket descriptor
	defer s.Close()

	// let's start from the beginning
	var lsn uint64 = 0
	it, err := s.Subscribe(lsn)
	if err != nil {
		log.Printf("Tnt Slave subscribing error:%v", err)
		return
	}

	// print snapshot
	var p *tnt16.Packet
	var hr = strings.Repeat("-", 80)
	// consume master's changes permanently
	for {
		p, err = it.Next()
		if err != nil {
			log.Printf("Tnt Slave consuming error:%v", err)
			return
		}
		log.Println(hr)
		switch q := p.Request.(type) {
		case *tnt16.Insert:
			switch q.Space {
			case tnt16.SpaceIndex, tnt16.SpaceSpace:
				// short default format
				log.Printf("Insert LSN:%v, Space:%v InstanceID:%v\n",
					p.LSN, q.Space, p.InstanceID)
			default:
				log.Printf("%v", p)
			}
		default:
			log.Printf("%v", p)
		}
	}
}
Output:

func (*Slave) Version

func (s *Slave) Version() uint32

type Subscribe

type Subscribe struct {
	UUID           string
	ReplicaSetUUID string
	VClock         VectorClock
}

Subscribe is the SUBSCRIBE command

func (*Subscribe) GetCommandID

func (q *Subscribe) GetCommandID() uint

func (*Subscribe) MarshalMsg

func (q *Subscribe) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Subscribe) UnmarshalMsg

func (q *Subscribe) UnmarshalMsg([]byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type Tuple

type Tuple []interface{}

type UnmarshalBinaryBodyFunc

type UnmarshalBinaryBodyFunc func(*Packet, []byte) error

type Update

type Update struct {
	Space    interface{}
	Index    interface{}
	Key      interface{}
	KeyTuple []interface{}
	Set      []Operator
}

func (*Update) GetCommandID

func (q *Update) GetCommandID() uint

func (*Update) MarshalMsg

func (q *Update) MarshalMsg(b []byte) ([]byte, error)

MarshalMsg implements msgp.Marshaler

func (*Update) UnmarshalMsg

func (q *Update) UnmarshalMsg(data []byte) ([]byte, error)

UnmarshalMsg implements msgp.Unmarshaler

type Upsert

type Upsert struct {
	Space interface{}
	Tuple []interface{}
	Set   []Operator
}

func (*Upsert) GetCommandID

func (q *Upsert) GetCommandID() uint

func (*Upsert) MarshalMsg

func (q *Upsert) MarshalMsg(b []byte) ([]byte, error)

MarshalMsg implements msgp.Marshaler

func (*Upsert) UnmarshalMsg

func (q *Upsert) UnmarshalMsg(data []byte) ([]byte, error)

UnmarshalMsg implements msgp.Unmarshaler

type VClock

type VClock struct {
	RequestID  uint64 // RequestID is SYNC field;
	InstanceID uint32
	VClock     VectorClock
}

VClock response (in OK). Similar to Result struct

func (*VClock) GetCommandID

func (p *VClock) GetCommandID() uint

func (*VClock) MarshalMsg

func (p *VClock) MarshalMsg(b []byte) ([]byte, error)

MarshalMsg implements msgp.Marshaler

func (*VClock) String

func (p *VClock) String() string

String implements Stringer interface.

func (*VClock) UnmarshalBinaryBody

func (p *VClock) UnmarshalBinaryBody(data []byte) (buf []byte, err error)

func (*VClock) UnmarshalBinaryHeader

func (p *VClock) UnmarshalBinaryHeader(data []byte) (buf []byte, err error)

func (*VClock) UnmarshalMsg

func (p *VClock) UnmarshalMsg(data []byte) (buf []byte, err error)

UnmarshalMsg implements msgp.Unmarshaller

type VectorClock

type VectorClock []uint64

VectorClock is used to store logical clocks (direct dependency clock implementation). Zero index is always reserved for internal use. You can get any lsn indexing VectorClock by instance ID directly (without any index offset). One can count instances in vector just using built-in len function.

func NewVectorClock

func NewVectorClock(lsns ...uint64) VectorClock

NewVectorClock returns VectorClock with clocks equal to the given lsn elements sequentially. Empty VectorClock would be returned if no lsn elements is given.

func (*VectorClock) Follow

func (vc *VectorClock) Follow(id uint32, lsn uint64) bool

Follow the clocks. Update vector clock with given clock part.

func (VectorClock) Has

func (vc VectorClock) Has(id uint32) bool

Has VectorClock specified ID?

func (VectorClock) LSN

func (vc VectorClock) LSN() uint64

LSN is the sum of the Clocks.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL