vshard_router

package module
v2.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2025 License: MIT Imports: 18 Imported by: 0

README

Go VShard Router

логотип go vshard router

Go Reference Actions Status Go Report Card Code Coverage License

Translations:

go-vshard-router is a library for sending requests to a sharded tarantool cluster directly, without using tarantool-router. This library based on tarantool vhsard library router and go-tarantool connector. go-vshard-router takes a new approach to creating your cluster

graph TD
    %% Old Cluster Schema
    subgraph Old_Cluster_Schema["Old Cluster Schema"]
        direction LR
        subgraph Tarantool Database Cluster
            subgraph Replicaset 1
                Master_001_1
                Replica_001_2
            end
        end

        ROUTER1["Tarantool vshard-router 1_1"] --> Master_001_1
        ROUTER2["Tarantool vshard-router 1_2"] --> Master_001_1
        ROUTER3["Tarantool vshard-router 1_3"] --> Master_001_1
        ROUTER1["Tarantool vshard-router 1_1"] --> Replica_001_2
        ROUTER2["Tarantool vshard-router 1_2"] --> Replica_001_2
        ROUTER3["Tarantool vshard-router 1_3"] --> Replica_001_2

        GO["Golang service"]
        GO --> ROUTER1
        GO --> ROUTER2
        GO --> ROUTER3
    end

    %% New Cluster Schema
    subgraph New_Cluster_Schema["New Cluster Schema"]
        direction LR
        subgraph Application Host
            Golang_Service
        end

        Golang_Service --> |iproto| MASTER1
        Golang_Service --> |iproto| REPLICA1

        MASTER1["Master 001_1"]
        REPLICA1["Replica 001_2"]

        subgraph Tarantool Database Cluster
            subgraph Replicaset 1
                MASTER1
                REPLICA1
            end
        end
    end

Getting started

Prerequisites
  • Go: any one of the two latest major releases (we test it with these).
Getting Go-Vshard-Router

With Go module support, simply add the following import

import "github.com/tarantool/go-vshard-router/v2"

to your code, and then go [build|run|test] will automatically fetch the necessary dependencies.

Otherwise, run the following Go command to install the go-vshard-router package:

$ go get -u github.com/tarantool/go-vshard-router/v2
Running Go-Vshard-Router

First you need to import Go-Vshard-Router package for using Go-Vshard-Router

package main

import (
  "context"
  "fmt"
  "strconv"
  "time"

  vshardrouter "github.com/tarantool/go-vshard-router/v2"
  "github.com/tarantool/go-vshard-router/v2/providers/static"

  "github.com/google/uuid"
  "github.com/tarantool/go-tarantool/v2"
)

func main() {
  ctx := context.Background()

  directRouter, err := vshardrouter.NewRouter(ctx, vshardrouter.Config{
    DiscoveryTimeout: time.Minute,
    DiscoveryMode:    vshardrouter.DiscoveryModeOn,
    TopologyProvider: static.NewProvider(map[vshardrouter.ReplicasetInfo][]vshardrouter.InstanceInfo{
      {Name: "replcaset_1", UUID: uuid.New()}: {
        {Addr: "127.0.0.1:1001", Name: "1_1"},
        {Addr: "127.0.0.1:1002", Name: "1_2"},
      },
      {Name: "replcaset_2", UUID: uuid.New()}: {
        {Addr: "127.0.0.1:2001", Name: "2_1"},
        {Addr: "127.0.0.1:2002", Name: "2_2"},
      },
    }),
    TotalBucketCount: 128000,
    PoolOpts: tarantool.Opts{
      Timeout: time.Second,
    },
  })
  if err != nil {
    panic(err)
  }

  user := struct {
    ID uint64
  }{
    ID: 123,
  }

  bucketID := directRouter.BucketIDStrCRC32(strconv.FormatUint(user.ID, 10))

  resp, err := directRouter.Call(
    ctx,
    bucketID,
    vshardrouter.CallModeBRO,
    "storage.api.get_user_info",
    []interface{}{&struct {
      BucketID uint64                 `msgpack:"bucket_id" json:"bucket_id,omitempty"`
      Body     map[string]interface{} `msgpack:"body"`
    }{
      BucketID: bucketID,
      Body: map[string]interface{}{
        "user_id": "123456",
      },
    }}, vshardrouter.CallOpts{Timeout: time.Second * 2},
  )
  if err != nil {
    panic(err)
  }

  info := &struct {
    BirthDay int
  }{}

  err = resp.GetTyped(&[]interface{}{info})
  if err != nil {
    panic(err)
  }

  interfaceResult, err := resp.Get()
  if err != nil {
    panic(err)
  }

  fmt.Printf("interface result: %v", interfaceResult)
  fmt.Printf("get typed result: %v", info)
}
Providers
topology

We understand that the implementations of loggers, metrics, and topology sources can vary significantly in both implementation and usage. Therefore, go-vshard-router gives you the flexibility to choose the tools you deem appropriate or implement them yourself by using interfaces.

Topology

You can use topology (configuration) providers as the source of router configuration.
Currently, the following providers are supported:

  • etcd (for configurations similar to moonlibs/config in etcd v2 for Tarantool versions below 3)
  • static (for specifying configuration directly in the code for ease of testing)
  • viper
    • etcd v3
    • consul
    • files
Metrics

Metrics providers are also available, you can use ready-made metrics providers that can be registered in prometheus and passed to go-vshard-router. This will allow you not to think about options and metrics settings. The following providers are currently available:

Learn more examples
Quick Start

Learn with th Quick Start, which include examples and theory.

Customer service

Service with go-vshard-router on top of the tarantool example from the original vshard library using raft

Benchmarks

Go Bench
Benchmark Runs Time (ns/op) Memory (B/op) Allocations (allocs/op)
BenchmarkCallSimpleInsert_GO-12 14929 77443 2308 34
BenchmarkCallSimpleInsert_Lua-12 10196 116101 1098 19
BenchmarkCallSimpleSelect_GO-12 20065 60521 2534 40
BenchmarkCallSimpleSelect_Lua-12 12064 99874 1153 20
K6

Topology:

  • 4 replicasets (x2 instances per rs)
  • 4 tarantool proxy
  • 1 golang service

constant VUes scenario: at a load close to production

select

  • go-vshard-router: uncritically worse latency, but 3 times more rps Image alt
  • tarantool-router: (80% cpu, heavy rps kills proxy at 100% cpu) Image alt

Documentation

Index

Constants

View Source
const (
	VShardErrCodeWrongBucket            = 1
	VShardErrCodeNonMaster              = 2
	VShardErrCodeBucketAlreadyExists    = 3
	VShardErrCodeNoSuchReplicaset       = 4
	VShardErrCodeMoveToSelf             = 5
	VShardErrCodeMissingMaster          = 6
	VShardErrCodeTransferIsInProgress   = 7
	VShardErrCodeUnreachableReplicaset  = 8
	VShardErrCodeNoRouteToBucket        = 9
	VShardErrCodeNonEmpty               = 10
	VShardErrCodeUnreachableMaster      = 11
	VShardErrCodeOutOfSync              = 12
	VShardErrCodeHighReplicationLag     = 13
	VShardErrCodeUnreachableReplica     = 14
	VShardErrCodeLowRedundancy          = 15
	VShardErrCodeInvalidRebalancing     = 16
	VShardErrCodeSuboptimalReplica      = 17
	VShardErrCodeUnknownBuckets         = 18
	VShardErrCodeReplicasetIsLocked     = 19
	VShardErrCodeObjectIsOutdated       = 20
	VShardErrCodeRouterAlreadyExists    = 21
	VShardErrCodeBucketIsLocked         = 22
	VShardErrCodeInvalidCfg             = 23
	VShardErrCodeBucketIsPinned         = 24
	VShardErrCodeTooManyReceiving       = 25
	VShardErrCodeStorageIsReferenced    = 26
	VShardErrCodeStorageRefAdd          = 27
	VShardErrCodeStorageRefUse          = 28
	VShardErrCodeStorageRefDel          = 29
	VShardErrCodeBucketRecvDataError    = 30
	VShardErrCodeMultipleMastersFound   = 31
	VShardErrCodeReplicasetInBackoff    = 32
	VShardErrCodeStorageIsDisabled      = 33
	VShardErrCodeBucketIsCorrupted      = 34
	VShardErrCodeRouterIsDisabled       = 35
	VShardErrCodeBucketGCError          = 36
	VShardErrCodeStorageCfgIsInProgress = 37
	VShardErrCodeRouterCfgIsInProgress  = 38
	VShardErrCodeBucketInvalidUpdate    = 39
	VShardErrCodeVhandshakeNotComplete  = 40
	VShardErrCodeInstanceNameMismatch   = 41
)

VShard error codes

View Source
const (
	VShardErrNameWrongBucket            = "WRONG_BUCKET"
	VShardErrNameNonMaster              = "NON_MASTER"
	VShardErrNameBucketAlreadyExists    = "BUCKET_ALREADY_EXISTS"
	VShardErrNameNoSuchReplicaset       = "NO_SUCH_REPLICASET"
	VShardErrNameMoveToSelf             = "MOVE_TO_SELF"
	VShardErrNameMissingMaster          = "MISSING_MASTER"
	VShardErrNameTransferIsInProgress   = "TRANSFER_IS_IN_PROGRESS"
	VShardErrNameUnreachableReplicaset  = "UNREACHABLE_REPLICASET"
	VShardErrNameNoRouteToBucket        = "NO_ROUTE_TO_BUCKET"
	VShardErrNameNonEmpty               = "NON_EMPTY"
	VShardErrNameUnreachableMaster      = "UNREACHABLE_MASTER"
	VShardErrNameOutOfSync              = "OUT_OF_SYNC"
	VShardErrNameHighReplicationLag     = "HIGH_REPLICATION_LAG"
	VShardErrNameUnreachableReplica     = "UNREACHABLE_REPLICA"
	VShardErrNameLowRedundancy          = "LOW_REDUNDANCY"
	VShardErrNameInvalidRebalancing     = "INVALID_REBALANCING"
	VShardErrNameSuboptimalReplica      = "SUBOPTIMAL_REPLICA"
	VShardErrNameUnknownBuckets         = "UNKNOWN_BUCKETS"
	VShardErrNameReplicasetIsLocked     = "REPLICASET_IS_LOCKED"
	VShardErrNameObjectIsOutdated       = "OBJECT_IS_OUTDATED"
	VShardErrNameRouterAlreadyExists    = "ROUTER_ALREADY_EXISTS"
	VShardErrNameBucketIsLocked         = "BUCKET_IS_LOCKED"
	VShardErrNameInvalidCfg             = "INVALID_CFG"
	VShardErrNameBucketIsPinned         = "BUCKET_IS_PINNED"
	VShardErrNameTooManyReceiving       = "TOO_MANY_RECEIVING"
	VShardErrNameStorageIsReferenced    = "STORAGE_IS_REFERENCED"
	VShardErrNameStorageRefAdd          = "STORAGE_REF_ADD"
	VShardErrNameStorageRefUse          = "STORAGE_REF_USE"
	VShardErrNameStorageRefDel          = "STORAGE_REF_DEL"
	VShardErrNameBucketRecvDataError    = "BUCKET_RECV_DATA_ERROR"
	VShardErrNameMultipleMastersFound   = "MULTIPLE_MASTERS_FOUND"
	VShardErrNameReplicasetInBackoff    = "REPLICASET_IN_BACKOFF"
	VShardErrNameStorageIsDisabled      = "STORAGE_IS_DISABLED"
	VShardErrNameBucketIsCorrupted      = "BUCKET_IS_CORRUPTED"
	VShardErrNameRouterIsDisabled       = "ROUTER_IS_DISABLED"
	VShardErrNameBucketGCError          = "BUCKET_GC_ERROR"
	VShardErrNameStorageCfgIsInProgress = "STORAGE_CFG_IS_IN_PROGRESS"
	VShardErrNameRouterCfgIsInProgress  = "ROUTER_CFG_IS_IN_PROGRESS"
	VShardErrNameBucketInvalidUpdate    = "BUCKET_INVALID_UPDATE"
	VShardErrNameVhandshakeNotComplete  = "VHANDSHAKE_NOT_COMPLETE"
	VShardErrNameInstanceNameMismatch   = "INSTANCE_NAME_MISMATCH"
)

VShard error names

Variables

View Source
var (
	ErrReplicasetExists    = fmt.Errorf("replicaset already exists")
	ErrReplicasetNotExists = fmt.Errorf("replicaset not exists")
)
View Source
var (
	// ErrInvalidConfig is returned when the configuration is invalid.
	ErrInvalidConfig = fmt.Errorf("config invalid")
	// ErrInvalidInstanceInfo is returned when the instance information is invalid.
	ErrInvalidInstanceInfo = fmt.Errorf("invalid instance info")
	// ErrInvalidReplicasetInfo is returned when the replicaset information is invalid.
	ErrInvalidReplicasetInfo = fmt.Errorf("invalid replicaset info")
	// ErrTopologyProvider is returned when there is an error from the topology provider.
	ErrTopologyProvider = fmt.Errorf("got error from topology provider")
)

Functions

func BucketIDStrCRC32

func BucketIDStrCRC32(shardKey string, totalBucketCount uint64) uint64

func CalculateEtalonBalance

func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error

CalculateEtalonBalance computes the ideal bucket count for each replicaset. This iterative algorithm seeks the optimal balance within a cluster by calculating the ideal bucket count for each replicaset at every step. If the ideal count cannot be achieved due to pinned buckets, the algorithm makes a best effort to approximate balance by ignoring the replicaset with pinned buckets and its associated pinned count. After each iteration, a new balance is recalculated. However, this can lead to scenarios where the conditions are still unmet; ignoring pinned buckets in overloaded replicasets can reduce the ideal bucket count in others, potentially causing new values to fall below their pinned count.

At each iteration, the algorithm either concludes or disregards at least one new overloaded replicaset. Therefore, its time complexity is O(N^2), where N is the number of replicasets. based on https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L1358

func RouterMapCallRW

func RouterMapCallRW[T any](r *Router, ctx context.Context,
	fnc string, args interface{}, opts RouterMapCallRWOptions,
) (map[string]T, error)

RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the cluster with a guarantee that in case of success it was executed with all buckets being accessible for reads and writes. T is a return type of user defined function 'fnc'. We define it as a distinct function, not a Router method, because golang limitations, see: https://github.com/golang/go/issues/49085.

Types

type BucketStatInfo

type BucketStatInfo struct {
	BucketID uint64 `msgpack:"id"`
	Status   string `msgpack:"status"`
}

func (*BucketStatInfo) DecodeMsgpack

func (bsi *BucketStatInfo) DecodeMsgpack(d *msgpack.Decoder) error

tnt vshard storage returns map with 'int' keys for bucketStatInfo, example: map[id:48 status:active 1:48 2:active]. But msgpackv5 supports only string keys when decoding maps into structs, see issue: https://github.com/vmihailenco/msgpack/issues/372 To workaround this we decode BucketStatInfo manually. When the issue above will be resolved, this code can be (and should be) deleted.

type BucketsSearchMode

type BucketsSearchMode int

BucketsSearchMode a type, that used to define policy for Router.Route method. See type Config for further details.

const (
	// BucketsSearchLegacy implements the same logic as lua router:
	// send bucket_stat request to every replicaset,
	// return a response immediately if any of them succeed.
	BucketsSearchLegacy BucketsSearchMode = iota
	// BucketsSearchBatchedQuick and BucketsSearchBatchedFull implement another logic:
	// send buckets_discovery request to every replicaset with from=bucketID,
	// seek our bucketID in their responses.
	// Additionally, store other bucketIDs in the route map.
	// BucketsSearchBatchedQuick stops iterating over replicasets responses as soon as our bucketID is found.
	BucketsSearchBatchedQuick
	// BucketsSearchBatchedFull implements the same logic as BucketsSearchBatchedQuick,
	// but doesn't stop iterating over replicasets responses as soon as our bucketID is found.
	// Instead, it always iterates over all replicasets responses even bucketID is found.
	BucketsSearchBatchedFull
)

type CallMode

type CallMode int

CallMode is a type to represent call mode for Router.Call method.

const (
	// CallModeRO sets a read-only mode for Router.Call.
	CallModeRO CallMode = iota
	// CallModeRW sets a read-write mode for Router.Call.
	CallModeRW
	// CallModeRE acts like CallModeRO
	// with preference for a replica rather than a master.
	// This mode is not supported yet.
	CallModeRE
	// CallModeBRO acts like CallModeRO with balancing.
	CallModeBRO
	// CallModeBRE acts like CallModeRO with balancing
	// and preference for a replica rather than a master.
	CallModeBRE
)

type CallOpts

type CallOpts struct {
	Timeout time.Duration
}

type CallRequest

type CallRequest struct {
	// contains filtered or unexported fields
}

CallRequest helps you to create a call request object for execution by a Connection.

func NewCallRequest

func NewCallRequest(function string) *CallRequest

NewCallRequest returns a new empty CallRequest.

func (*CallRequest) Args

func (req *CallRequest) Args(args interface{}) *CallRequest

Args sets the args for the eval request. Note: default value is empty.

func (*CallRequest) BucketID

func (req *CallRequest) BucketID(bucketID uint64) *CallRequest

BucketID method that sets the bucketID for your request. You can ignore this parameter if you have a bucketGetter. However, this method has a higher priority.

func (*CallRequest) Context

func (req *CallRequest) Context(ctx context.Context) *CallRequest

Context sets a passed context to the request.

type CallResponse

type CallResponse struct {
	// contains filtered or unexported fields
}

CallResponse is a backwards-compatible structure with go-tarantool for easier replacement.

func (*CallResponse) Get

func (resp *CallResponse) Get() ([]interface{}, error)

Get implementation now works synchronously for response. The interface was created purely for convenient migration to go-vshard-router from go-tarantool.

func (*CallResponse) GetTyped

func (resp *CallResponse) GetTyped(result interface{}) error

GetTyped waits synchronously for response and calls msgpack.Decoder.Decode(result) if no error happens.

type Config

type Config struct {
	// Providers
	// Loggerf injects a custom logger. By default there is no logger is used.
	Loggerf          LogfProvider     // Loggerf is not required
	Metrics          MetricsProvider  // Metrics is not required
	TopologyProvider TopologyProvider // TopologyProvider is required provider

	// Discovery
	// DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout.
	DiscoveryTimeout time.Duration
	DiscoveryMode    DiscoveryMode
	// DiscoveryWorkStep is a pause between calling buckets_discovery on storage
	// in buckets discovering logic. Default is 10ms.
	DiscoveryWorkStep time.Duration

	// BucketsSearchMode defines policy for Router.Route method.
	// Default value is BucketsSearchLegacy.
	// See BucketsSearchMode constants for more detail.
	BucketsSearchMode BucketsSearchMode

	TotalBucketCount uint64
	User             string
	Password         string
	PoolOpts         tarantool.Opts

	// BucketGetter is an optional argument.
	// You can specify a function that will receive the bucket id from the context.
	// This is useful if you use middleware that inserts the calculated bucket id into the request context.
	BucketGetter func(ctx context.Context) uint64
	// RequestTimeout timeout for requests to Tarantool.
	// Don't rely on using this timeout.
	// This is the difference between the timeout of the library itself
	// that is, our retry timeout if the buckets, for example, move.
	// Currently, it only works for sugar implementations .
	RequestTimeout time.Duration
}

type DiscoveryMode

type DiscoveryMode int
const (
	// DiscoveryModeOn is cron discovery with cron timeout
	DiscoveryModeOn DiscoveryMode = iota
	DiscoveryModeOnce
)

type EmptyMetrics

type EmptyMetrics struct{}

EmptyMetrics is default empty metrics provider you can embed this type and realize just some metrics

func (*EmptyMetrics) CronDiscoveryEvent

func (e *EmptyMetrics) CronDiscoveryEvent(_ bool, _ time.Duration, _ string)

func (*EmptyMetrics) RequestDuration

func (e *EmptyMetrics) RequestDuration(_ time.Duration, _ string, _, _ bool)

func (*EmptyMetrics) RetryOnCall

func (e *EmptyMetrics) RetryOnCall(_ string)

type InstanceInfo

type InstanceInfo struct {
	// Name is a required field for the instance.
	// Starting with Tarantool 3.0, this definition is made into a human-readable name,
	// and it is now mandatory for use in the library.
	// The Name should be a unique identifier for the instance.
	Name string

	// Addr specifies the address of the instance.
	// This can be an IP address or a domain name, depending on how the instance is accessed.
	// It is necessary for connecting to the instance or identifying its location.
	Addr string

	// UUID is an optional field that provides a globally unique identifier (UUID) for the instance.
	// While this information is not mandatory, it can be useful for internal management or tracking purposes.
	// The UUID ensures that each instance can be identified uniquely, but it is not required for basic operations.
	UUID uuid.UUID
}

InstanceInfo represents the information about an instance. This struct holds the necessary details such as the name, address, and UUID of the instance.

func (InstanceInfo) String

func (ii InstanceInfo) String() string

func (InstanceInfo) Validate

func (ii InstanceInfo) Validate() error

type LogfProvider

type LogfProvider interface {
	Debugf(ctx context.Context, format string, v ...any)
	Infof(ctx context.Context, format string, v ...any)
	Warnf(ctx context.Context, format string, v ...any)
	Errorf(ctx context.Context, format string, v ...any)
}

LogfProvider an interface to inject a custom logger.

func NewSlogLogger added in v2.0.2

func NewSlogLogger(logger *slog.Logger) LogfProvider

NewSlogLogger wraps slog logger

type MetricsProvider

type MetricsProvider interface {
	CronDiscoveryEvent(ok bool, duration time.Duration, reason string)
	RetryOnCall(reason string)
	RequestDuration(duration time.Duration, procedure string, ok, mapReduce bool)
}

MetricsProvider is an interface for passing library metrics to your prometheus/graphite and other metrics. This logic is experimental and may be changed in the release.

type Pooler added in v2.0.5

type Pooler interface {
	pool.Pooler
	// GetInfo is an addition to the standard interface that allows you
	// to get the current state of the connection pool.
	// This is necessary for proper operation with topology providers
	// for adding or removing instances.
	GetInfo() map[string]pool.ConnectionInfo
}

Pooler is an interface for the tarantool.Pool wrapper, which is necessary for the correct operation of the library.

type Replicaset

type Replicaset struct {
	EtalonBucketCount uint64
	// contains filtered or unexported fields
}

func (*Replicaset) BucketForceCreate

func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error

func (*Replicaset) BucketStat

func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error)

func (*Replicaset) BucketsCount

func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error)

func (*Replicaset) CallAsync

func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future

CallAsync sends async request to remote storage

func (*Replicaset) Pooler

func (rs *Replicaset) Pooler() pool.Pooler

func (*Replicaset) String

func (rs *Replicaset) String() string

type ReplicasetCallOpts

type ReplicasetCallOpts struct {
	PoolMode pool.Mode
	Timeout  time.Duration
}

type ReplicasetInfo

type ReplicasetInfo struct {
	// Name — the name of the replicaset.
	// This string is required and is used to identify the replicaset.
	Name string
	// UUID — the unique identifier of the replica.
	// This is an optional value that can be used to uniquely distinguish each replicaset.
	UUID uuid.UUID
	// Weight — the weight of the replicaset.
	// This floating-point number may be used to determine the importance or priority of the replicaset.
	Weight float64
	// PinnedCount — the number of pinned items.
	// This value indicates how many items or tasks are associated with the replicaset.
	PinnedCount uint64
	// IgnoreDisbalance — a flag indicating whether to ignore load imbalance when distributing tasks.
	// If true, the replicaset will be excluded from imbalance checks.
	IgnoreDisbalance bool
}

ReplicasetInfo represents information about a replicaset, including its name, unique identifier, weight, and state.

func (ReplicasetInfo) String

func (ri ReplicasetInfo) String() string

func (ReplicasetInfo) Validate

func (ri ReplicasetInfo) Validate() error

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(ctx context.Context, cfg Config) (*Router, error)

NewRouter - the main library function. It creates a Router for direct request routing in a sharded Tarantool cluster. It connects to masters and replicas for load distribution and fault tolerance.

func (*Router) AddInstance

func (r *Router) AddInstance(ctx context.Context, rsName string, info InstanceInfo) error

func (*Router) AddReplicaset

func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error

func (*Router) AddReplicasets

func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error

func (*Router) BucketCount

func (r *Router) BucketCount() uint64

BucketCount returns the total number of buckets specified in cfg.

func (*Router) BucketIDStrCRC32

func (r *Router) BucketIDStrCRC32(shardKey string) uint64

BucketIDStrCRC32 return the bucket identifier from the parameter used for sharding.

func (*Router) BucketReset

func (r *Router) BucketReset(bucketID uint64)

func (*Router) BucketSet

func (r *Router) BucketSet(bucketID uint64, rsName string) (*Replicaset, error)

BucketSet Set a bucket to a replicaset.

func (*Router) Call

func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
	fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)

Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.

func (*Router) CallBRE

func (r *Router) CallBRE(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)

CallBRE is an alias for Call with CallModeBRE.

func (*Router) CallBRO

func (r *Router) CallBRO(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)

CallBRO is an alias for Call with CallModeBRO.

func (*Router) CallRE

func (r *Router) CallRE(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)

CallRE is an alias for Call with CallModeRE.

func (*Router) CallRO

func (r *Router) CallRO(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)

CallRO is an alias for Call with CallModeRO.

func (*Router) CallRW

func (r *Router) CallRW(ctx context.Context, bucketID uint64,
	fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)

CallRW is an alias for Call with CallModeRW.

func (*Router) ClusterBootstrap

func (r *Router) ClusterBootstrap(ctx context.Context, ifNotBootstrapped bool) error

ClusterBootstrap initializes the cluster by bootstrapping the necessary buckets across the available replicasets. It checks the current state of each replicaset and creates buckets if required. The function takes a context for managing cancellation and deadlines, and a boolean parameter ifNotBootstrapped to control error handling. If ifNotBootstrapped is true, the function will log any errors encountered during the bootstrapping process but will not halt execution; instead, it will return the last error encountered. If ifNotBootstrapped is false, any error will result in an immediate return, ensuring that the operation either succeeds fully or fails fast. Deprecated: use lua bootstrap now, go-router bootstrap now works invalid.

func (*Router) DiscoveryAllBuckets

func (r *Router) DiscoveryAllBuckets(ctx context.Context) error

func (*Router) DiscoveryHandleBuckets

func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64)

DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.

func (*Router) Do

func (r *Router) Do(req *CallRequest, userMode pool.Mode) *CallResponse

Do perform a request synchronously on the connection. It is important that the logic of this method is different from go-tarantool.

func (*Router) RemoveInstance

func (r *Router) RemoveInstance(ctx context.Context, rsName, instanceName string) error

RemoveInstance removes a specific instance from the router topology within a replicaset. It takes a context, the replicaset name (rsName), and the instance name (instanceName) as inputs. If the replicaset name is empty, it searches through all replica sets to locate the instance. Returns an error if the specified replicaset does not exist or if any issue occurs during removal.

func (*Router) RemoveReplicaset

func (r *Router) RemoveReplicaset(ctx context.Context, rsName string) []error

func (*Router) Route

func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error)

Route get replicaset object by bucket identifier.

func (*Router) RouteAll

func (r *Router) RouteAll() map[string]*Replicaset

RouteAll return map of all replicasets.

func (*Router) RouteMapClean

func (r *Router) RouteMapClean()

func (*Router) Topology

func (r *Router) Topology() TopologyController

type RouterMapCallRWOptions

type RouterMapCallRWOptions struct {
	// Timeout defines timeout for RouterMapCallRW.
	Timeout time.Duration
}

RouterMapCallRWOptions sets options for RouterMapCallRW.

type SlogLoggerf added in v2.0.2

type SlogLoggerf struct {
	Logger *slog.Logger
}

SlogLoggerf is adapter for slog to Logger interface.

func (*SlogLoggerf) Debugf added in v2.0.2

func (s *SlogLoggerf) Debugf(ctx context.Context, format string, v ...any)

Debugf implements Debugf method for LogfProvider interface

func (SlogLoggerf) Errorf added in v2.0.2

func (s SlogLoggerf) Errorf(ctx context.Context, format string, v ...any)

Errorf implements Errorf method for LogfProvider interface

func (SlogLoggerf) Infof added in v2.0.2

func (s SlogLoggerf) Infof(ctx context.Context, format string, v ...any)

Infof implements Infof method for LogfProvider interface

func (SlogLoggerf) Warnf added in v2.0.2

func (s SlogLoggerf) Warnf(ctx context.Context, format string, v ...any)

Warnf implements Warnf method for LogfProvider interface

type StdoutLogLevel

type StdoutLogLevel int

StdoutLogLevel is a type to control log level for StdoutLoggerf.

const (
	// StdoutLogDefault is equal to default value of StdoutLogLevel. Acts like StdoutLogInfo.
	StdoutLogDefault StdoutLogLevel = iota
	// StdoutLogDebug enables debug or higher level logs for StdoutLoggerf
	StdoutLogDebug
	// StdoutLogInfo enables only info or higher level logs for StdoutLoggerf
	StdoutLogInfo
	// StdoutLogWarn enables only warn or higher level logs for StdoutLoggerf
	StdoutLogWarn
	// StdoutLogError enables error level logs for StdoutLoggerf
	StdoutLogError
)

type StdoutLoggerf

type StdoutLoggerf struct {
	// LogLevel controls log level to print, see StdoutLogLevel constants for details.
	LogLevel StdoutLogLevel
}

StdoutLoggerf a logger that prints into stderr

func (StdoutLoggerf) Debugf

func (s StdoutLoggerf) Debugf(_ context.Context, format string, v ...any)

Debugf implements Debugf method for LogfProvider interface

func (StdoutLoggerf) Errorf

func (s StdoutLoggerf) Errorf(_ context.Context, format string, v ...any)

Errorf implements Errorf method for LogfProvider interface

func (StdoutLoggerf) Infof

func (s StdoutLoggerf) Infof(_ context.Context, format string, v ...any)

Infof implements Infof method for LogfProvider interface

func (StdoutLoggerf) Warnf

func (s StdoutLoggerf) Warnf(_ context.Context, format string, v ...any)

Warnf implements Warnf method for LogfProvider interface

type StorageCallVShardError

type StorageCallVShardError struct {
	BucketID uint64 `msgpack:"bucket_id"`
	Reason   string `msgpack:"reason"`
	Code     int    `msgpack:"code"`
	Type     string `msgpack:"type"`
	Message  string `msgpack:"message"`
	Name     string `msgpack:"name"`
	// These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type
	// Example: 00000000-0000-0002-0002-000000000000
	MasterUUID     string `msgpack:"master"`
	ReplicasetUUID string `msgpack:"replicaset"`
	ReplicaUUID    string `msgpack:"replica"`
	Destination    string `msgpack:"destination"`
}

func (StorageCallVShardError) Error

func (s StorageCallVShardError) Error() string

type TopologyController

type TopologyController interface {
	AddInstance(ctx context.Context, rsName string, info InstanceInfo) error
	RemoveReplicaset(ctx context.Context, rsName string) []error
	RemoveInstance(ctx context.Context, rsName, instanceName string) error
	AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error
	AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
}

TopologyController is an entity that allows you to interact with the topology. TopologyController is not concurrent safe. This decision is made intentionally because there is no point in providing concurrence safety for this case. In any case, a caller can use his own external synchronization primitive to handle concurrent access.

type TopologyProvider

type TopologyProvider interface {
	// Init should create the current topology at the beginning
	// and change the state during the process of changing the point of receiving the cluster configuration
	Init(t TopologyController) error
	// Close closes all connections if the provider created them
	Close()
}

TopologyProvider is external module that can lookup current topology of cluster it might be etcd/config/consul or smth else

type VshardMode

type VshardMode string
const (
	ReadMode  VshardMode = "read"
	WriteMode VshardMode = "write"
)

func (VshardMode) String

func (c VshardMode) String() string

type VshardRouterCallResp

type VshardRouterCallResp struct {
	// contains filtered or unexported fields
}

VshardRouterCallResp represents a response from Router.Call[XXX] methods.

func (VshardRouterCallResp) Get

func (r VshardRouterCallResp) Get() ([]interface{}, error)

Get returns a response from user defined function as []interface{}.

func (VshardRouterCallResp) GetTyped

func (r VshardRouterCallResp) GetTyped(result interface{}) error

GetTyped decodes a response from user defined function into custom values.

Directories

Path Synopsis
mocks
providers
etcd
Package etcd based on moonlibs config library https://github.com/moonlibs/config?tab=readme-ov-file#multi-shard-topology-for-custom-sharding-etcdclustermaster
Package etcd based on moonlibs config library https://github.com/moonlibs/config?tab=readme-ov-file#multi-shard-topology-for-custom-sharding-etcdclustermaster
nolint:revive
nolint:revive

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL