zanredisdb

package module
v0.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2021 License: MIT Imports: 20 Imported by: 3

README

go-zanredisdb

GoDoc GitHub release

The official Go SDK package for ZanRedisDB.

Docs

See godoc and the test case for examples of client usage.

Tests

Run go test (which requires the ZanRedisDB cluster inited with the test namespace).

Documentation

Index

Constants

View Source
const (
	DefaultConnPoolMaxActive = 400
	DefaultConnPoolMaxIdle   = 3
)
View Source
const (
	LOG_ERR int32 = iota
	LOG_INFO
	LOG_DEBUG
)
View Source
const (
	MinRetrySleep = time.Millisecond * 5
)

Variables

View Source
var (
	RetryFailedInterval     = time.Second * 5
	MaxRetryInterval        = time.Minute
	NextRetryFailedInterval = time.Minute * 2
	ErrCntForStopRW         = 3
	LargeKeyPoolNum         = 4
)
View Source
var (
	FailedOnClusterChanged = "ERR_CLUSTER_CHANGED"
	FailedOnNotLeader      = "E_FAILED_ON_NOT_LEADER"
	FailedOnNotWritable    = "E_FAILED_ON_NOT_WRITABLE"
	FailedOnNodeStopped    = "the node stopped"
)
View Source
var ErrInvalidKey = errors.New("invalid key format")
View Source
var ErrSizeExceedLimit = errors.New("key value size exceeded the limit")

Functions

func DoRedisCmd

func DoRedisCmd(conn redis.Conn, cmdName string, args ...interface{}) (reply interface{}, err error)

func FindString

func FindString(src []string, f string) int

func GetHashedPartitionID

func GetHashedPartitionID(pk []byte, pnum int) int

func IsConnectClosed added in v0.4.0

func IsConnectClosed(err error) bool

func IsConnectRefused

func IsConnectRefused(err error) bool

func IsFailedOnClusterChanged

func IsFailedOnClusterChanged(err error) bool

func IsFailedOnNotWritable

func IsFailedOnNotWritable(err error) bool

func IsSlowCmd added in v0.3.0

func IsSlowCmd(cmd string) bool

func IsTimeoutErr added in v0.5.0

func IsTimeoutErr(err error) bool

func SetLogger

func SetLogger(level int32, l Logger)

should call only once before any proxy started.

func ValidPrefix added in v0.3.5

func ValidPrefix(k string) bool

Types

type Cluster

type Cluster struct {
	sync.RWMutex

	LookupList []string
	// contains filtered or unexported fields
}

func NewCluster

func NewCluster(conf *Conf, largeConf *LargeKeyConf) *Cluster

func (*Cluster) ChangeMaxActive added in v0.3.0

func (cluster *Cluster) ChangeMaxActive(active int)

func (*Cluster) Close

func (cluster *Cluster) Close()

func (*Cluster) GetAllHostsByPart

func (cluster *Cluster) GetAllHostsByPart(pid int) ([]*RedisHost, error)

func (*Cluster) GetConn

func (cluster *Cluster) GetConn(pk []byte, leader bool, tryLocalForRead bool, isSlowQuery bool) (redis.Conn, error)

func (*Cluster) GetConnForLarge added in v0.3.0

func (cluster *Cluster) GetConnForLarge(pk []byte, leader bool, tryLocalForRead bool, vsize int) (redis.Conn, error)

func (*Cluster) GetConnsByHosts

func (cluster *Cluster) GetConnsByHosts(hosts []string, isSlowQuery bool) ([]redis.Conn, error)

func (*Cluster) GetConnsForAllParts

func (cluster *Cluster) GetConnsForAllParts(isSlowQuery bool) ([]redis.Conn, error)

func (*Cluster) GetHostAndConn

func (cluster *Cluster) GetHostAndConn(pk []byte, leader bool, tryLocalForRead bool, isSlowQuery bool) (*RedisHost, redis.Conn, error)

func (*Cluster) GetHostAndConnForLarge added in v0.3.0

func (cluster *Cluster) GetHostAndConnForLarge(pk []byte, leader bool, tryLocalForRead bool, vsize int) (*RedisHost, redis.Conn, error)

func (*Cluster) GetHostByPart

func (cluster *Cluster) GetHostByPart(pid int, leader bool) (*RedisHost, error)

func (*Cluster) GetHostStats added in v0.3.0

func (cluster *Cluster) GetHostStats() map[string]HostStats

func (*Cluster) GetNodeHost

func (cluster *Cluster) GetNodeHost(pk []byte, leader bool, tryLocalForRead bool) (*RedisHost, error)

func (*Cluster) GetPartitionNum

func (cluster *Cluster) GetPartitionNum() int

func (*Cluster) IsSameDCFirst added in v0.3.0

func (cluster *Cluster) IsSameDCFirst() bool

func (*Cluster) IsStopped added in v0.3.5

func (cluster *Cluster) IsStopped() bool

func (*Cluster) MaybeTriggerCheckForError

func (cluster *Cluster) MaybeTriggerCheckForError(err error, delay time.Duration) bool

type Conf

type Conf struct {
	LookupList []string
	// multi conf and lookuplist should not be used both
	MultiConf        MultiClusterConf
	DialTimeout      time.Duration
	ReadTimeout      time.Duration
	RangeReadTimeout time.Duration
	WriteTimeout     time.Duration
	IdleTimeout      time.Duration
	MaxConnWait      time.Duration
	MaxRetryGetConn  int
	MaxActiveConn    int
	// idle num that will be kept for all idle connections
	MaxIdleConn int
	// default 0.4
	RangeConnRatio float64
	TendInterval   int64
	Namespace      string
	Password       string
	// the datacenter info for client
	// will be used for a single cluster acrossing datacenter
	DC string
}

func NewDefaultConf added in v0.4.0

func NewDefaultConf() *Conf

func (*Conf) CheckValid added in v0.3.0

func (conf *Conf) CheckValid() error

type HashElem

type HashElem struct {
	Field []byte
	Value []byte
}

type HostStats added in v0.3.0

type HostStats struct {
	PoolCnt          int   `json:"pool_cnt,omitempty"`
	PoolWaitCnt      int   `json:"pool_wait_cnt,omitempty"`
	RangePoolCnt     int   `json:"range_pool_cnt,omitempty"`
	RangePoolWaitCnt int   `json:"range_pool_wait_cnt,omitempty"`
	LargeKeyPoolCnt  []int `json:"large_key_pool_cnt,omitempty"`
	FailedCnt        int64 `json:"failed_cnt,omitempty"`
	FailedTs         int64 `json:"failed_cnt,omitempty"`
}

type LargeKeyConf added in v0.3.0

type LargeKeyConf struct {
	MinPoolSize               int
	GetConnTimeoutForLargeKey time.Duration
	MaxAllowedValueSize       int
}

This configuration will be used to isolate the large key write and exception key access. Write a value large than max allowed size will return error and for MaxAllowedValueSize > value > MaxAllowedValueSize/2, a isolated pool with only MinPoolSize connection will be used MaxAllowedValueSize/2 > value > MaxAllowedValueSize/4, a isolated pool with only 2*MinPoolSize connections will be used MaxAllowedValueSize/4 > value > MaxAllowedValueSize/8, a isolated pool with only 4*MinPoolSize connections will be used for command with more than 1024 arguments will use the isolated pool with only MinPoolSize connection for exception command will use the isolated pool with only MinPoolSize connection

func NewLargeKeyConf added in v0.3.0

func NewLargeKeyConf() *LargeKeyConf

type LevelLogger

type LevelLogger struct {
	Logger Logger
	// contains filtered or unexported fields
}

func NewLevelLogger

func NewLevelLogger(l int32, logger Logger) *LevelLogger

func (*LevelLogger) Debugf

func (self *LevelLogger) Debugf(f string, args ...interface{})

func (*LevelLogger) Detailf

func (self *LevelLogger) Detailf(f string, args ...interface{})

func (*LevelLogger) Errorf

func (self *LevelLogger) Errorf(f string, args ...interface{})

func (*LevelLogger) Errorln

func (self *LevelLogger) Errorln(f string)

func (*LevelLogger) Flush

func (self *LevelLogger) Flush()

func (*LevelLogger) Infof

func (self *LevelLogger) Infof(f string, args ...interface{})

func (*LevelLogger) Infoln

func (self *LevelLogger) Infoln(f string)

func (*LevelLogger) Level

func (self *LevelLogger) Level() int32

func (*LevelLogger) Printf

func (self *LevelLogger) Printf(f string, args ...interface{})

used only for wrap call (for other logger interface)

func (*LevelLogger) SetLevel

func (self *LevelLogger) SetLevel(l int32)

func (*LevelLogger) Warningf

func (self *LevelLogger) Warningf(f string, args ...interface{})

func (*LevelLogger) Warningln

func (self *LevelLogger) Warningln(f string)

type Logger

type Logger interface {
	Output(depth int, s string)
	OutputErr(depth int, s string)
	Flush()
}

type MultiClusterConf added in v0.3.0

type MultiClusterConf []RemoteClusterConf

func (MultiClusterConf) CheckValid added in v0.3.0

func (mcc MultiClusterConf) CheckValid() error

type NodeInfo

type NodeInfo struct {
	RegID             uint64
	ID                string
	NodeIP            string
	Hostname          string
	RedisPort         string
	HttpPort          string
	RpcPort           string
	RaftTransportAddr string
	Version           string
	Tags              map[string]bool
	DataRoot          string
	RsyncModule       string
	Epoch             int64
}

type PKey

type PKey struct {
	Namespace string
	Set       string
	PK        []byte
	RawKey    []byte
}

func NewPKey

func NewPKey(ns string, set string, pk []byte) *PKey

func ParsePKey added in v0.3.5

func ParsePKey(k string) (*PKey, error)

func (*PKey) ShardingKey

func (self *PKey) ShardingKey() []byte

func (*PKey) String

func (self *PKey) String() string

type PartitionAddrInfo

type PartitionAddrInfo struct {
	Leader         string
	Replicas       []string
	ReplicasDCInfo []string
	ReplicaInfos   []node
	// contains filtered or unexported fields
}

type PartitionAddrs

type PartitionAddrs struct {
	PNum  int
	PList []PartitionAddrInfo
}

type PartitionInfo

type PartitionInfo struct {
	Leader   *RedisHost
	Replicas []*RedisHost
	// contains filtered or unexported fields
}

type PartitionNodeInfo

type PartitionNodeInfo struct {
	Leader   node   `json:"leader"`
	Replicas []node `json:"replicas"`
}

type Partitions

type Partitions struct {
	PNum  int
	Epoch int64
	PList []PartitionInfo
}

type PipelineCmd

type PipelineCmd struct {
	CmdName     string
	ShardingKey []byte
	ToLeader    bool
	Args        []interface{}
}

type PipelineCmdList

type PipelineCmdList []PipelineCmd

func (*PipelineCmdList) Add

func (pl *PipelineCmdList) Add(cmd string, shardingKey []byte, toLeader bool,
	args ...interface{})

type PoolType added in v0.3.0

type PoolType int
const (
	DefPool PoolType = iota
	RangePool
	LargeKeyPool
)

type RedisHost

type RedisHost struct {
	// contains filtered or unexported fields
}

func (*RedisHost) Addr

func (rh *RedisHost) Addr() string

func (*RedisHost) ChangeMaxActive added in v0.3.0

func (rh *RedisHost) ChangeMaxActive(maxActive int, rangeRatio float64)

func (*RedisHost) CloseConn

func (rh *RedisHost) CloseConn()

func (*RedisHost) Conn added in v0.5.2

func (rh *RedisHost) Conn(poolType PoolType, hint int, retry int) (redis.Conn, error)

func (*RedisHost) ConnPool

func (rh *RedisHost) ConnPool(poolType PoolType) *redis.QueuePool

func (*RedisHost) GrpcAddr

func (rh *RedisHost) GrpcAddr() string

func (*RedisHost) IncSuccess

func (rh *RedisHost) IncSuccess()

func (*RedisHost) InitConnPool

func (rh *RedisHost) InitConnPool(
	newFn func() (redis.Conn, error),
	newRangeFn func() (redis.Conn, error),
	testBorrow func(redis.Conn, time.Time) error,
	conf *Conf, largeConf *LargeKeyConf)

func (*RedisHost) IsActive added in v0.5.0

func (rh *RedisHost) IsActive() bool

func (*RedisHost) MaybeIncFailed

func (rh *RedisHost) MaybeIncFailed(err error)

func (*RedisHost) Refresh

func (rh *RedisHost) Refresh()

func (*RedisHost) Stats added in v0.3.0

func (rh *RedisHost) Stats() HostStats

type RemoteClusterConf added in v0.3.0

type RemoteClusterConf struct {
	LookupList []string
	IsPrimary  bool
	ClusterDC  string
}

type ScanKey

type ScanKey struct {
	Namespace string
	Type      string
	Set       string
	Count     int
	Cursor    []byte
	RawKey    []byte
}

func NewScanKey

func NewScanKey(ns, set, t string, count int, cursor []byte) *ScanKey

func (*ScanKey) String

func (self *ScanKey) String() string

type SimpleLogger

type SimpleLogger struct {
	// contains filtered or unexported fields
}

func NewSimpleLogger

func NewSimpleLogger() *SimpleLogger

func (*SimpleLogger) Flush

func (self *SimpleLogger) Flush()

func (*SimpleLogger) Output

func (self *SimpleLogger) Output(depth int, s string)

func (*SimpleLogger) OutputErr

func (self *SimpleLogger) OutputErr(depth int, s string)

type ZSetElem

type ZSetElem struct {
	Member []byte
	Score  float64
}

type ZanRedisClient

type ZanRedisClient struct {
	// contains filtered or unexported fields
}

func NewZanRedisClient

func NewZanRedisClient(conf *Conf) (*ZanRedisClient, error)

func (*ZanRedisClient) AdvScan

func (client *ZanRedisClient) AdvScan(tp, set string, count int, cursor []byte) ([]byte, [][]byte, error)

func (*ZanRedisClient) AdvScanChannel

func (client *ZanRedisClient) AdvScanChannel(tp, set string, stopC chan struct{}) chan []byte

func (*ZanRedisClient) DoFullScan

func (client *ZanRedisClient) DoFullScan(cmd, tp, set string, count int, cursor []byte) ([]byte, []interface{}, error)

func (*ZanRedisClient) DoFullScanChannel

func (client *ZanRedisClient) DoFullScanChannel(tp, set string, stopC chan struct{}) chan interface{}

func (*ZanRedisClient) DoRedis

func (self *ZanRedisClient) DoRedis(cmd string, shardingKey []byte, toLeader bool,
	args ...interface{}) (interface{}, error)

func (*ZanRedisClient) DoRedisForException added in v0.3.0

func (self *ZanRedisClient) DoRedisForException(cmd string, shardingKey []byte, toLeader bool,
	args ...interface{}) (interface{}, error)

func (*ZanRedisClient) DoRedisForRead added in v0.3.0

func (self *ZanRedisClient) DoRedisForRead(cmd string, shardingKey []byte, toLeader bool,
	args ...interface{}) (interface{}, error)

func (*ZanRedisClient) DoRedisForWrite added in v0.3.0

func (self *ZanRedisClient) DoRedisForWrite(cmd string, shardingKey []byte, toLeader bool,
	args ...interface{}) (interface{}, error)

func (*ZanRedisClient) DoRedisTryLocalRead added in v0.3.0

func (self *ZanRedisClient) DoRedisTryLocalRead(cmd string, shardingKey []byte, toLeader bool,
	args ...interface{}) (interface{}, error)

DoRedisTryLocalRead should be used only for read command. If there is a local dc cluster and read the non-leader is allowed, it will try read local dc cluster first. If failed to read local dc for 2 times it will retry the primary dc cluster.

func (*ZanRedisClient) DoScan

func (client *ZanRedisClient) DoScan(cmd, tp, set string, count int, cursor []byte) ([]byte, [][]byte, error)

func (*ZanRedisClient) DoScanChannel

func (client *ZanRedisClient) DoScanChannel(cmd, tp, set string, stopC chan struct{}) chan []byte

func (*ZanRedisClient) FlushAndWaitPipelineCmd

func (self *ZanRedisClient) FlushAndWaitPipelineCmd(cmds PipelineCmdList) ([]interface{}, []error)

func (*ZanRedisClient) FullScan

func (client *ZanRedisClient) FullScan(tp, set string, count int, cursor []byte) ([]byte, []interface{}, error)

func (*ZanRedisClient) FullScanChannel

func (client *ZanRedisClient) FullScanChannel(tp, set string, stopC chan struct{}) chan interface{}

func (*ZanRedisClient) HScan

func (client *ZanRedisClient) HScan(set string, key []byte, count int, cursor []byte) ([]byte, []HashElem, error)

func (*ZanRedisClient) HScanChannel

func (client *ZanRedisClient) HScanChannel(set string, key []byte, stopC chan struct{}) chan []byte

func (*ZanRedisClient) KVDel

func (self *ZanRedisClient) KVDel(set string, key []byte) (int, error)

func (*ZanRedisClient) KVGet

func (self *ZanRedisClient) KVGet(set string, key []byte) ([]byte, error)

func (*ZanRedisClient) KVMDel

func (self *ZanRedisClient) KVMDel(readLeader bool, pKeys ...*PKey) (int64, error)

func (*ZanRedisClient) KVMExists

func (self *ZanRedisClient) KVMExists(readLeader bool, pKeys ...*PKey) (int64, error)

func (*ZanRedisClient) KVMGet

func (self *ZanRedisClient) KVMGet(readLeader bool, pKeys ...*PKey) ([][]byte, error)

func (*ZanRedisClient) KVScan

func (client *ZanRedisClient) KVScan(set string, count int, cursor []byte) ([]byte, [][]byte, error)

func (*ZanRedisClient) KVScanChannel

func (client *ZanRedisClient) KVScanChannel(set string, stopC chan struct{}) chan []byte

func (*ZanRedisClient) KVSet

func (self *ZanRedisClient) KVSet(set string, key []byte, value []byte) error

func (*ZanRedisClient) KVSetNX

func (self *ZanRedisClient) KVSetNX(set string, key []byte, value []byte) (int, error)

func (*ZanRedisClient) SScan

func (client *ZanRedisClient) SScan(set string, key []byte, count int, cursor []byte) ([]byte, [][]byte, error)

func (*ZanRedisClient) SScanChannel

func (client *ZanRedisClient) SScanChannel(set string, key []byte, stopC chan struct{}) chan []byte

func (*ZanRedisClient) SetLargeKeyConf added in v0.3.0

func (self *ZanRedisClient) SetLargeKeyConf(conf *LargeKeyConf)

func (*ZanRedisClient) Start

func (self *ZanRedisClient) Start()

func (*ZanRedisClient) Stats added in v0.6.1

func (self *ZanRedisClient) Stats() map[string]HostStats

func (*ZanRedisClient) Stop

func (self *ZanRedisClient) Stop()

func (*ZanRedisClient) SwitchSameDC

func (self *ZanRedisClient) SwitchSameDC(useSameDC bool)

while deploy across two datacenters, to improve read latency we can enable this, and set toLeader=false while calling DoRedis if there is no node in the same data center, we will fallback to the random node in other dc.

func (*ZanRedisClient) ZScan

func (client *ZanRedisClient) ZScan(set string, key []byte, count int, cursor []byte) ([]byte, []ZSetElem, error)

func (*ZanRedisClient) ZScanChannel

func (client *ZanRedisClient) ZScanChannel(set string, key []byte, stopC chan struct{}) chan ZSetElem

Directories

Path Synopsis
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL