Documentation
¶
Index ¶
- Constants
- Variables
- func BucketIDStrCRC32(shardKey string, totalBucketCount uint64) uint64
- func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error
- func RouterMapCallRW[T any](r *Router, ctx context.Context, fnc string, args interface{}, ...) (map[string]T, error)
- type BucketStatInfo
- type BucketsSearchMode
- type CallMode
- type CallOpts
- type CallRequest
- type CallResponse
- type Config
- type DiscoveryMode
- type EmptyMetrics
- type InstanceInfo
- type LogfProvider
- type MetricsProvider
- type Pooler
- type Replicaset
- func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error
- func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error)
- func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error)
- func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future
- func (rs *Replicaset) Pooler() pool.Pooler
- func (rs *Replicaset) String() string
- type ReplicasetCallOpts
- type ReplicasetInfo
- type Router
- func (r *Router) AddInstance(ctx context.Context, rsName string, info InstanceInfo) error
- func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error
- func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
- func (r *Router) BucketCount() uint64
- func (r *Router) BucketIDStrCRC32(shardKey string) uint64
- func (r *Router) BucketReset(bucketID uint64)
- func (r *Router) BucketSet(bucketID uint64, rsName string) (*Replicaset, error)
- func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode, fnc string, ...) (VshardRouterCallResp, error)
- func (r *Router) CallBRE(ctx context.Context, bucketID uint64, fnc string, args interface{}, ...) (VshardRouterCallResp, error)
- func (r *Router) CallBRO(ctx context.Context, bucketID uint64, fnc string, args interface{}, ...) (VshardRouterCallResp, error)
- func (r *Router) CallRE(ctx context.Context, bucketID uint64, fnc string, args interface{}, ...) (VshardRouterCallResp, error)
- func (r *Router) CallRO(ctx context.Context, bucketID uint64, fnc string, args interface{}, ...) (VshardRouterCallResp, error)
- func (r *Router) CallRW(ctx context.Context, bucketID uint64, fnc string, args interface{}, ...) (VshardRouterCallResp, error)
- func (r *Router) ClusterBootstrap(ctx context.Context, ifNotBootstrapped bool) error
- func (r *Router) DiscoveryAllBuckets(ctx context.Context) error
- func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64)
- func (r *Router) Do(req *CallRequest, userMode pool.Mode) *CallResponse
- func (r *Router) RemoveInstance(ctx context.Context, rsName, instanceName string) error
- func (r *Router) RemoveReplicaset(ctx context.Context, rsName string) []error
- func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error)
- func (r *Router) RouteAll() map[string]*Replicaset
- func (r *Router) RouteMapClean()
- func (r *Router) Topology() TopologyController
- type RouterMapCallRWOptions
- type SlogLoggerf
- type StdoutLogLevel
- type StdoutLoggerf
- type StorageCallVShardError
- type TopologyController
- type TopologyProvider
- type VshardMode
- type VshardRouterCallResp
Constants ¶
const ( VShardErrCodeWrongBucket = 1 VShardErrCodeNonMaster = 2 VShardErrCodeBucketAlreadyExists = 3 VShardErrCodeNoSuchReplicaset = 4 VShardErrCodeMoveToSelf = 5 VShardErrCodeMissingMaster = 6 VShardErrCodeTransferIsInProgress = 7 VShardErrCodeUnreachableReplicaset = 8 VShardErrCodeNoRouteToBucket = 9 VShardErrCodeNonEmpty = 10 VShardErrCodeUnreachableMaster = 11 VShardErrCodeOutOfSync = 12 VShardErrCodeHighReplicationLag = 13 VShardErrCodeUnreachableReplica = 14 VShardErrCodeLowRedundancy = 15 VShardErrCodeInvalidRebalancing = 16 VShardErrCodeSuboptimalReplica = 17 VShardErrCodeUnknownBuckets = 18 VShardErrCodeReplicasetIsLocked = 19 VShardErrCodeObjectIsOutdated = 20 VShardErrCodeRouterAlreadyExists = 21 VShardErrCodeBucketIsLocked = 22 VShardErrCodeInvalidCfg = 23 VShardErrCodeBucketIsPinned = 24 VShardErrCodeTooManyReceiving = 25 VShardErrCodeStorageIsReferenced = 26 VShardErrCodeStorageRefAdd = 27 VShardErrCodeStorageRefUse = 28 VShardErrCodeStorageRefDel = 29 VShardErrCodeBucketRecvDataError = 30 VShardErrCodeMultipleMastersFound = 31 VShardErrCodeReplicasetInBackoff = 32 VShardErrCodeStorageIsDisabled = 33 VShardErrCodeBucketIsCorrupted = 34 VShardErrCodeRouterIsDisabled = 35 VShardErrCodeBucketGCError = 36 VShardErrCodeStorageCfgIsInProgress = 37 VShardErrCodeRouterCfgIsInProgress = 38 VShardErrCodeBucketInvalidUpdate = 39 VShardErrCodeVhandshakeNotComplete = 40 VShardErrCodeInstanceNameMismatch = 41 )
VShard error codes
const ( VShardErrNameWrongBucket = "WRONG_BUCKET" VShardErrNameNonMaster = "NON_MASTER" VShardErrNameBucketAlreadyExists = "BUCKET_ALREADY_EXISTS" VShardErrNameNoSuchReplicaset = "NO_SUCH_REPLICASET" VShardErrNameMoveToSelf = "MOVE_TO_SELF" VShardErrNameMissingMaster = "MISSING_MASTER" VShardErrNameTransferIsInProgress = "TRANSFER_IS_IN_PROGRESS" VShardErrNameUnreachableReplicaset = "UNREACHABLE_REPLICASET" VShardErrNameNoRouteToBucket = "NO_ROUTE_TO_BUCKET" VShardErrNameNonEmpty = "NON_EMPTY" VShardErrNameUnreachableMaster = "UNREACHABLE_MASTER" VShardErrNameOutOfSync = "OUT_OF_SYNC" VShardErrNameHighReplicationLag = "HIGH_REPLICATION_LAG" VShardErrNameUnreachableReplica = "UNREACHABLE_REPLICA" VShardErrNameLowRedundancy = "LOW_REDUNDANCY" VShardErrNameInvalidRebalancing = "INVALID_REBALANCING" VShardErrNameSuboptimalReplica = "SUBOPTIMAL_REPLICA" VShardErrNameUnknownBuckets = "UNKNOWN_BUCKETS" VShardErrNameReplicasetIsLocked = "REPLICASET_IS_LOCKED" VShardErrNameObjectIsOutdated = "OBJECT_IS_OUTDATED" VShardErrNameRouterAlreadyExists = "ROUTER_ALREADY_EXISTS" VShardErrNameBucketIsLocked = "BUCKET_IS_LOCKED" VShardErrNameInvalidCfg = "INVALID_CFG" VShardErrNameBucketIsPinned = "BUCKET_IS_PINNED" VShardErrNameTooManyReceiving = "TOO_MANY_RECEIVING" VShardErrNameStorageIsReferenced = "STORAGE_IS_REFERENCED" VShardErrNameStorageRefAdd = "STORAGE_REF_ADD" VShardErrNameStorageRefUse = "STORAGE_REF_USE" VShardErrNameStorageRefDel = "STORAGE_REF_DEL" VShardErrNameBucketRecvDataError = "BUCKET_RECV_DATA_ERROR" VShardErrNameMultipleMastersFound = "MULTIPLE_MASTERS_FOUND" VShardErrNameReplicasetInBackoff = "REPLICASET_IN_BACKOFF" VShardErrNameStorageIsDisabled = "STORAGE_IS_DISABLED" VShardErrNameBucketIsCorrupted = "BUCKET_IS_CORRUPTED" VShardErrNameRouterIsDisabled = "ROUTER_IS_DISABLED" VShardErrNameBucketGCError = "BUCKET_GC_ERROR" VShardErrNameStorageCfgIsInProgress = "STORAGE_CFG_IS_IN_PROGRESS" VShardErrNameRouterCfgIsInProgress = "ROUTER_CFG_IS_IN_PROGRESS" VShardErrNameBucketInvalidUpdate = "BUCKET_INVALID_UPDATE" VShardErrNameVhandshakeNotComplete = "VHANDSHAKE_NOT_COMPLETE" VShardErrNameInstanceNameMismatch = "INSTANCE_NAME_MISMATCH" )
VShard error names
Variables ¶
var ( ErrReplicasetExists = fmt.Errorf("replicaset already exists") ErrReplicasetNotExists = fmt.Errorf("replicaset not exists") )
var ( // ErrInvalidConfig is returned when the configuration is invalid. ErrInvalidConfig = fmt.Errorf("config invalid") // ErrInvalidInstanceInfo is returned when the instance information is invalid. ErrInvalidInstanceInfo = fmt.Errorf("invalid instance info") // ErrInvalidReplicasetInfo is returned when the replicaset information is invalid. ErrInvalidReplicasetInfo = fmt.Errorf("invalid replicaset info") // ErrTopologyProvider is returned when there is an error from the topology provider. ErrTopologyProvider = fmt.Errorf("got error from topology provider") )
Functions ¶
func BucketIDStrCRC32 ¶
func CalculateEtalonBalance ¶
func CalculateEtalonBalance(replicasets []Replicaset, bucketCount uint64) error
CalculateEtalonBalance computes the ideal bucket count for each replicaset. This iterative algorithm seeks the optimal balance within a cluster by calculating the ideal bucket count for each replicaset at every step. If the ideal count cannot be achieved due to pinned buckets, the algorithm makes a best effort to approximate balance by ignoring the replicaset with pinned buckets and its associated pinned count. After each iteration, a new balance is recalculated. However, this can lead to scenarios where the conditions are still unmet; ignoring pinned buckets in overloaded replicasets can reduce the ideal bucket count in others, potentially causing new values to fall below their pinned count.
At each iteration, the algorithm either concludes or disregards at least one new overloaded replicaset. Therefore, its time complexity is O(N^2), where N is the number of replicasets. based on https://github.com/tarantool/vshard/blob/99ceaee014ea3a67424c2026545838e08d69b90c/vshard/replicaset.lua#L1358
func RouterMapCallRW ¶
func RouterMapCallRW[T any](r *Router, ctx context.Context, fnc string, args interface{}, opts RouterMapCallRWOptions, ) (map[string]T, error)
RouterMapCallRW is a consistent Map-Reduce. The given function is called on all masters in the cluster with a guarantee that in case of success it was executed with all buckets being accessible for reads and writes. T is a return type of user defined function 'fnc'. We define it as a distinct function, not a Router method, because golang limitations, see: https://github.com/golang/go/issues/49085.
Types ¶
type BucketStatInfo ¶
func (*BucketStatInfo) DecodeMsgpack ¶
func (bsi *BucketStatInfo) DecodeMsgpack(d *msgpack.Decoder) error
tnt vshard storage returns map with 'int' keys for bucketStatInfo, example: map[id:48 status:active 1:48 2:active]. But msgpackv5 supports only string keys when decoding maps into structs, see issue: https://github.com/vmihailenco/msgpack/issues/372 To workaround this we decode BucketStatInfo manually. When the issue above will be resolved, this code can be (and should be) deleted.
type BucketsSearchMode ¶
type BucketsSearchMode int
BucketsSearchMode a type, that used to define policy for Router.Route method. See type Config for further details.
const ( // BucketsSearchLegacy implements the same logic as lua router: // send bucket_stat request to every replicaset, // return a response immediately if any of them succeed. BucketsSearchLegacy BucketsSearchMode = iota // BucketsSearchBatchedQuick and BucketsSearchBatchedFull implement another logic: // send buckets_discovery request to every replicaset with from=bucketID, // seek our bucketID in their responses. // Additionally, store other bucketIDs in the route map. // BucketsSearchBatchedQuick stops iterating over replicasets responses as soon as our bucketID is found. BucketsSearchBatchedQuick // BucketsSearchBatchedFull implements the same logic as BucketsSearchBatchedQuick, // but doesn't stop iterating over replicasets responses as soon as our bucketID is found. // Instead, it always iterates over all replicasets responses even bucketID is found. BucketsSearchBatchedFull )
type CallMode ¶
type CallMode int
CallMode is a type to represent call mode for Router.Call method.
const ( // CallModeRO sets a read-only mode for Router.Call. CallModeRO CallMode = iota // CallModeRW sets a read-write mode for Router.Call. CallModeRW // CallModeRE acts like CallModeRO // with preference for a replica rather than a master. // This mode is not supported yet. CallModeRE // CallModeBRO acts like CallModeRO with balancing. CallModeBRO // CallModeBRE acts like CallModeRO with balancing // and preference for a replica rather than a master. CallModeBRE )
type CallRequest ¶
type CallRequest struct {
// contains filtered or unexported fields
}
CallRequest helps you to create a call request object for execution by a Connection.
func NewCallRequest ¶
func NewCallRequest(function string) *CallRequest
NewCallRequest returns a new empty CallRequest.
func (*CallRequest) Args ¶
func (req *CallRequest) Args(args interface{}) *CallRequest
Args sets the args for the eval request. Note: default value is empty.
func (*CallRequest) BucketID ¶
func (req *CallRequest) BucketID(bucketID uint64) *CallRequest
BucketID method that sets the bucketID for your request. You can ignore this parameter if you have a bucketGetter. However, this method has a higher priority.
func (*CallRequest) Context ¶
func (req *CallRequest) Context(ctx context.Context) *CallRequest
Context sets a passed context to the request.
type CallResponse ¶
type CallResponse struct {
// contains filtered or unexported fields
}
CallResponse is a backwards-compatible structure with go-tarantool for easier replacement.
func (*CallResponse) Get ¶
func (resp *CallResponse) Get() ([]interface{}, error)
Get implementation now works synchronously for response. The interface was created purely for convenient migration to go-vshard-router from go-tarantool.
func (*CallResponse) GetTyped ¶
func (resp *CallResponse) GetTyped(result interface{}) error
GetTyped waits synchronously for response and calls msgpack.Decoder.Decode(result) if no error happens.
type Config ¶
type Config struct { // Providers // Loggerf injects a custom logger. By default there is no logger is used. Loggerf LogfProvider // Loggerf is not required Metrics MetricsProvider // Metrics is not required TopologyProvider TopologyProvider // TopologyProvider is required provider // Discovery // DiscoveryTimeout is timeout between cron discovery job; by default there is no timeout. DiscoveryTimeout time.Duration DiscoveryMode DiscoveryMode // DiscoveryWorkStep is a pause between calling buckets_discovery on storage // in buckets discovering logic. Default is 10ms. DiscoveryWorkStep time.Duration // BucketsSearchMode defines policy for Router.Route method. // Default value is BucketsSearchLegacy. // See BucketsSearchMode constants for more detail. BucketsSearchMode BucketsSearchMode TotalBucketCount uint64 User string Password string PoolOpts tarantool.Opts // BucketGetter is an optional argument. // You can specify a function that will receive the bucket id from the context. // This is useful if you use middleware that inserts the calculated bucket id into the request context. BucketGetter func(ctx context.Context) uint64 // RequestTimeout timeout for requests to Tarantool. // Don't rely on using this timeout. // This is the difference between the timeout of the library itself // that is, our retry timeout if the buckets, for example, move. // Currently, it only works for sugar implementations . RequestTimeout time.Duration }
type DiscoveryMode ¶
type DiscoveryMode int
const ( // DiscoveryModeOn is cron discovery with cron timeout DiscoveryModeOn DiscoveryMode = iota DiscoveryModeOnce )
type EmptyMetrics ¶
type EmptyMetrics struct{}
EmptyMetrics is default empty metrics provider you can embed this type and realize just some metrics
func (*EmptyMetrics) CronDiscoveryEvent ¶
func (e *EmptyMetrics) CronDiscoveryEvent(_ bool, _ time.Duration, _ string)
func (*EmptyMetrics) RequestDuration ¶
func (e *EmptyMetrics) RequestDuration(_ time.Duration, _ string, _, _ bool)
func (*EmptyMetrics) RetryOnCall ¶
func (e *EmptyMetrics) RetryOnCall(_ string)
type InstanceInfo ¶
type InstanceInfo struct { // Name is a required field for the instance. // Starting with Tarantool 3.0, this definition is made into a human-readable name, // and it is now mandatory for use in the library. // The Name should be a unique identifier for the instance. Name string // Addr specifies the address of the instance. // This can be an IP address or a domain name, depending on how the instance is accessed. // It is necessary for connecting to the instance or identifying its location. Addr string // UUID is an optional field that provides a globally unique identifier (UUID) for the instance. // While this information is not mandatory, it can be useful for internal management or tracking purposes. // The UUID ensures that each instance can be identified uniquely, but it is not required for basic operations. UUID uuid.UUID }
InstanceInfo represents the information about an instance. This struct holds the necessary details such as the name, address, and UUID of the instance.
func (InstanceInfo) String ¶
func (ii InstanceInfo) String() string
func (InstanceInfo) Validate ¶
func (ii InstanceInfo) Validate() error
type LogfProvider ¶
type LogfProvider interface { Debugf(ctx context.Context, format string, v ...any) Infof(ctx context.Context, format string, v ...any) Warnf(ctx context.Context, format string, v ...any) Errorf(ctx context.Context, format string, v ...any) }
LogfProvider an interface to inject a custom logger.
func NewSlogLogger ¶ added in v2.0.2
func NewSlogLogger(logger *slog.Logger) LogfProvider
NewSlogLogger wraps slog logger
type MetricsProvider ¶
type MetricsProvider interface { CronDiscoveryEvent(ok bool, duration time.Duration, reason string) RetryOnCall(reason string) RequestDuration(duration time.Duration, procedure string, ok, mapReduce bool) }
MetricsProvider is an interface for passing library metrics to your prometheus/graphite and other metrics. This logic is experimental and may be changed in the release.
type Pooler ¶ added in v2.0.5
type Pooler interface { pool.Pooler // GetInfo is an addition to the standard interface that allows you // to get the current state of the connection pool. // This is necessary for proper operation with topology providers // for adding or removing instances. GetInfo() map[string]pool.ConnectionInfo }
Pooler is an interface for the tarantool.Pool wrapper, which is necessary for the correct operation of the library.
type Replicaset ¶
type Replicaset struct { EtalonBucketCount uint64 // contains filtered or unexported fields }
func (*Replicaset) BucketForceCreate ¶
func (rs *Replicaset) BucketForceCreate(ctx context.Context, firstBucketID, count uint64) error
func (*Replicaset) BucketStat ¶
func (rs *Replicaset) BucketStat(ctx context.Context, bucketID uint64) (BucketStatInfo, error)
func (*Replicaset) BucketsCount ¶
func (rs *Replicaset) BucketsCount(ctx context.Context) (uint64, error)
func (*Replicaset) CallAsync ¶
func (rs *Replicaset) CallAsync(ctx context.Context, opts ReplicasetCallOpts, fnc string, args interface{}) *tarantool.Future
CallAsync sends async request to remote storage
func (*Replicaset) Pooler ¶
func (rs *Replicaset) Pooler() pool.Pooler
func (*Replicaset) String ¶
func (rs *Replicaset) String() string
type ReplicasetCallOpts ¶
type ReplicasetInfo ¶
type ReplicasetInfo struct { // Name — the name of the replicaset. // This string is required and is used to identify the replicaset. Name string // UUID — the unique identifier of the replica. // This is an optional value that can be used to uniquely distinguish each replicaset. UUID uuid.UUID // Weight — the weight of the replicaset. // This floating-point number may be used to determine the importance or priority of the replicaset. Weight float64 // PinnedCount — the number of pinned items. // This value indicates how many items or tasks are associated with the replicaset. PinnedCount uint64 // IgnoreDisbalance — a flag indicating whether to ignore load imbalance when distributing tasks. // If true, the replicaset will be excluded from imbalance checks. IgnoreDisbalance bool }
ReplicasetInfo represents information about a replicaset, including its name, unique identifier, weight, and state.
func (ReplicasetInfo) String ¶
func (ri ReplicasetInfo) String() string
func (ReplicasetInfo) Validate ¶
func (ri ReplicasetInfo) Validate() error
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
func NewRouter ¶
NewRouter - the main library function. It creates a Router for direct request routing in a sharded Tarantool cluster. It connects to masters and replicas for load distribution and fault tolerance.
func (*Router) AddInstance ¶
func (*Router) AddReplicaset ¶
func (r *Router) AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error
func (*Router) AddReplicasets ¶
func (r *Router) AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error
func (*Router) BucketCount ¶
BucketCount returns the total number of buckets specified in cfg.
func (*Router) BucketIDStrCRC32 ¶
BucketIDStrCRC32 return the bucket identifier from the parameter used for sharding.
func (*Router) BucketReset ¶
func (*Router) BucketSet ¶
func (r *Router) BucketSet(bucketID uint64, rsName string) (*Replicaset, error)
BucketSet Set a bucket to a replicaset.
func (*Router) Call ¶
func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode, fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)
Call calls the function identified by 'fnc' on the shard storing the bucket identified by 'bucket_id'.
func (*Router) CallBRE ¶
func (r *Router) CallBRE(ctx context.Context, bucketID uint64, fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)
CallBRE is an alias for Call with CallModeBRE.
func (*Router) CallBRO ¶
func (r *Router) CallBRO(ctx context.Context, bucketID uint64, fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)
CallBRO is an alias for Call with CallModeBRO.
func (*Router) CallRE ¶
func (r *Router) CallRE(ctx context.Context, bucketID uint64, fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)
CallRE is an alias for Call with CallModeRE.
func (*Router) CallRO ¶
func (r *Router) CallRO(ctx context.Context, bucketID uint64, fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)
CallRO is an alias for Call with CallModeRO.
func (*Router) CallRW ¶
func (r *Router) CallRW(ctx context.Context, bucketID uint64, fnc string, args interface{}, opts CallOpts) (VshardRouterCallResp, error)
CallRW is an alias for Call with CallModeRW.
func (*Router) ClusterBootstrap ¶
ClusterBootstrap initializes the cluster by bootstrapping the necessary buckets across the available replicasets. It checks the current state of each replicaset and creates buckets if required. The function takes a context for managing cancellation and deadlines, and a boolean parameter ifNotBootstrapped to control error handling. If ifNotBootstrapped is true, the function will log any errors encountered during the bootstrapping process but will not halt execution; instead, it will return the last error encountered. If ifNotBootstrapped is false, any error will result in an immediate return, ensuring that the operation either succeeds fully or fails fast. Deprecated: use lua bootstrap now, go-router bootstrap now works invalid.
func (*Router) DiscoveryAllBuckets ¶
func (*Router) DiscoveryHandleBuckets ¶
func (r *Router) DiscoveryHandleBuckets(ctx context.Context, rs *Replicaset, buckets []uint64)
DiscoveryHandleBuckets arrange downloaded buckets to the route map so as they reference a given replicaset.
func (*Router) Do ¶
func (r *Router) Do(req *CallRequest, userMode pool.Mode) *CallResponse
Do perform a request synchronously on the connection. It is important that the logic of this method is different from go-tarantool.
func (*Router) RemoveInstance ¶
RemoveInstance removes a specific instance from the router topology within a replicaset. It takes a context, the replicaset name (rsName), and the instance name (instanceName) as inputs. If the replicaset name is empty, it searches through all replica sets to locate the instance. Returns an error if the specified replicaset does not exist or if any issue occurs during removal.
func (*Router) RemoveReplicaset ¶
func (*Router) RouteAll ¶
func (r *Router) RouteAll() map[string]*Replicaset
RouteAll return map of all replicasets.
func (*Router) RouteMapClean ¶
func (r *Router) RouteMapClean()
func (*Router) Topology ¶
func (r *Router) Topology() TopologyController
type RouterMapCallRWOptions ¶
type RouterMapCallRWOptions struct { // Timeout defines timeout for RouterMapCallRW. Timeout time.Duration }
RouterMapCallRWOptions sets options for RouterMapCallRW.
type SlogLoggerf ¶ added in v2.0.2
SlogLoggerf is adapter for slog to Logger interface.
func (*SlogLoggerf) Debugf ¶ added in v2.0.2
func (s *SlogLoggerf) Debugf(ctx context.Context, format string, v ...any)
Debugf implements Debugf method for LogfProvider interface
func (SlogLoggerf) Errorf ¶ added in v2.0.2
func (s SlogLoggerf) Errorf(ctx context.Context, format string, v ...any)
Errorf implements Errorf method for LogfProvider interface
type StdoutLogLevel ¶
type StdoutLogLevel int
StdoutLogLevel is a type to control log level for StdoutLoggerf.
const ( // StdoutLogDefault is equal to default value of StdoutLogLevel. Acts like StdoutLogInfo. StdoutLogDefault StdoutLogLevel = iota // StdoutLogDebug enables debug or higher level logs for StdoutLoggerf StdoutLogDebug // StdoutLogInfo enables only info or higher level logs for StdoutLoggerf StdoutLogInfo // StdoutLogWarn enables only warn or higher level logs for StdoutLoggerf StdoutLogWarn // StdoutLogError enables error level logs for StdoutLoggerf StdoutLogError )
type StdoutLoggerf ¶
type StdoutLoggerf struct { // LogLevel controls log level to print, see StdoutLogLevel constants for details. LogLevel StdoutLogLevel }
StdoutLoggerf a logger that prints into stderr
func (StdoutLoggerf) Debugf ¶
func (s StdoutLoggerf) Debugf(_ context.Context, format string, v ...any)
Debugf implements Debugf method for LogfProvider interface
func (StdoutLoggerf) Errorf ¶
func (s StdoutLoggerf) Errorf(_ context.Context, format string, v ...any)
Errorf implements Errorf method for LogfProvider interface
type StorageCallVShardError ¶
type StorageCallVShardError struct { BucketID uint64 `msgpack:"bucket_id"` Reason string `msgpack:"reason"` Code int `msgpack:"code"` Type string `msgpack:"type"` Message string `msgpack:"message"` Name string `msgpack:"name"` // These 3 fields below are send as string by vshard storage, so we decode them into string, not uuid.UUID type // Example: 00000000-0000-0002-0002-000000000000 MasterUUID string `msgpack:"master"` ReplicasetUUID string `msgpack:"replicaset"` ReplicaUUID string `msgpack:"replica"` Destination string `msgpack:"destination"` }
func (StorageCallVShardError) Error ¶
func (s StorageCallVShardError) Error() string
type TopologyController ¶
type TopologyController interface { AddInstance(ctx context.Context, rsName string, info InstanceInfo) error RemoveReplicaset(ctx context.Context, rsName string) []error RemoveInstance(ctx context.Context, rsName, instanceName string) error AddReplicaset(ctx context.Context, rsInfo ReplicasetInfo, instances []InstanceInfo) error AddReplicasets(ctx context.Context, replicasets map[ReplicasetInfo][]InstanceInfo) error }
TopologyController is an entity that allows you to interact with the topology. TopologyController is not concurrent safe. This decision is made intentionally because there is no point in providing concurrence safety for this case. In any case, a caller can use his own external synchronization primitive to handle concurrent access.
type TopologyProvider ¶
type TopologyProvider interface { // Init should create the current topology at the beginning // and change the state during the process of changing the point of receiving the cluster configuration Init(t TopologyController) error // Close closes all connections if the provider created them Close() }
TopologyProvider is external module that can lookup current topology of cluster it might be etcd/config/consul or smth else
type VshardMode ¶
type VshardMode string
const ( ReadMode VshardMode = "read" WriteMode VshardMode = "write" )
func (VshardMode) String ¶
func (c VshardMode) String() string
type VshardRouterCallResp ¶
type VshardRouterCallResp struct {
// contains filtered or unexported fields
}
VshardRouterCallResp represents a response from Router.Call[XXX] methods.
func (VshardRouterCallResp) Get ¶
func (r VshardRouterCallResp) Get() ([]interface{}, error)
Get returns a response from user defined function as []interface{}.
func (VshardRouterCallResp) GetTyped ¶
func (r VshardRouterCallResp) GetTyped(result interface{}) error
GetTyped decodes a response from user defined function into custom values.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
mocks
|
|
providers
|
|
etcd
Package etcd based on moonlibs config library https://github.com/moonlibs/config?tab=readme-ov-file#multi-shard-topology-for-custom-sharding-etcdclustermaster
|
Package etcd based on moonlibs config library https://github.com/moonlibs/config?tab=readme-ov-file#multi-shard-topology-for-custom-sharding-etcdclustermaster |
nolint:revive
|
nolint:revive |