proxy

package
v0.0.0-...-ac4ab60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2022 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FlagWrite = 1 << iota
	FlagMasterOnly
	FlagMayWrite
	FlagNotAllow
)
View Source
const (
	StatsCmds = StatsFlags(1 << iota)
	StatsSlots
	StatsRuntime

	StatsFull = StatsFlags(^uint32(0))
)
View Source
const DefaultConfig = `` /* 3902-byte string literal not displayed */
View Source
const DefaultRequestChanBuffer = 128
View Source
const GOLDEN_RATIO_PRIME_32 = 0x9e370001
View Source
const MaxOpStrLen = 64
View Source
const MaxSlotNum = models.MaxSlotNum

Variables

View Source
var (
	ErrBackendConnReset = errors.New("backend conn reset")
	ErrRequestIsBroken  = errors.New("request is broken")
)
View Source
var (
	ErrSlotIsNotReady = errors.New("slot is not ready, may be offline")
	ErrRespIsRequired = errors.New("resp is required")
)
View Source
var (
	ErrBadMultiBulk = errors.New("bad multi-bulk for command")
	ErrBadOpStrLen  = errors.New("bad command length, too short or too long")
)
View Source
var (
	ErrClosedRouter  = errors.New("use of closed router")
	ErrInvalidSlotId = errors.New("use of invalid slot id")
	ErrInvalidMethod = errors.New("use of invalid forwarder method")
)
View Source
var (
	ErrRouterNotOnline          = errors.New("router is not online")
	ErrTooManySessions          = errors.New("too many sessions")
	ErrTooManyPipelinedRequests = errors.New("too many pipelined requests")
)
View Source
var ErrClosedJodis = errors.New("use of closed jodis")
View Source
var ErrClosedProxy = errors.New("use of closed proxy")
View Source
var RespOK = redis.NewString([]byte("OK"))

Functions

func Hash

func Hash(key []byte) uint32

func OpFails

func OpFails() int64

func OpQPS

func OpQPS() int64

func OpRedisErrors

func OpRedisErrors() int64

func OpTotal

func OpTotal() int64

func ResetStats

func ResetStats()

func SessionsAlive

func SessionsAlive() int64

func SessionsTotal

func SessionsTotal() int64

Types

type ApiClient

type ApiClient struct {
	// contains filtered or unexported fields
}

func NewApiClient

func NewApiClient(addr string) *ApiClient

func (*ApiClient) FillSlots

func (c *ApiClient) FillSlots(slots ...*models.Slot) error

func (*ApiClient) ForceGC

func (c *ApiClient) ForceGC() error

func (*ApiClient) LogLevel

func (c *ApiClient) LogLevel(level log.LogLevel) error

func (*ApiClient) Model

func (c *ApiClient) Model() (*models.Proxy, error)

func (*ApiClient) Overview

func (c *ApiClient) Overview() (*Overview, error)

func (*ApiClient) ResetStats

func (c *ApiClient) ResetStats() error

func (*ApiClient) RewatchSentinels

func (c *ApiClient) RewatchSentinels() error

func (*ApiClient) SetSentinels

func (c *ApiClient) SetSentinels(sentinel *models.Sentinel) error

func (*ApiClient) SetXAuth

func (c *ApiClient) SetXAuth(name, auth string, token string)

func (*ApiClient) Shutdown

func (c *ApiClient) Shutdown() error

func (*ApiClient) Slots

func (c *ApiClient) Slots() ([]*models.Slot, error)

func (*ApiClient) Start

func (c *ApiClient) Start() error

func (*ApiClient) Stats

func (c *ApiClient) Stats(flags StatsFlags) (*Stats, error)

func (*ApiClient) StatsSimple

func (c *ApiClient) StatsSimple() (*Stats, error)

func (*ApiClient) XPing

func (c *ApiClient) XPing() error

type BackendConn

type BackendConn struct {
	// contains filtered or unexported fields
}

func NewBackendConn

func NewBackendConn(addr string, database int, config *Config) *BackendConn

func (*BackendConn) Addr

func (bc *BackendConn) Addr() string

func (*BackendConn) Close

func (bc *BackendConn) Close()

func (*BackendConn) IsConnected

func (bc *BackendConn) IsConnected() bool

func (*BackendConn) KeepAlive

func (bc *BackendConn) KeepAlive() bool

func (*BackendConn) PushBack

func (bc *BackendConn) PushBack(r *Request)

type Config

type Config struct {
	ProtoType string `toml:"proto_type" json:"proto_type"`
	ProxyAddr string `toml:"proxy_addr" json:"proxy_addr"`
	AdminAddr string `toml:"admin_addr" json:"admin_addr"`

	HostProxy string `toml:"-" json:"-"`
	HostAdmin string `toml:"-" json:"-"`

	JodisName       string            `toml:"jodis_name" json:"jodis_name"`
	JodisAddr       string            `toml:"jodis_addr" json:"jodis_addr"`
	JodisAuth       string            `toml:"jodis_auth" json:"jodis_auth"`
	JodisTimeout    timesize.Duration `toml:"jodis_timeout" json:"jodis_timeout"`
	JodisCompatible bool              `toml:"jodis_compatible" json:"jodis_compatible"`

	ProductName string `toml:"product_name" json:"product_name"`
	ProductAuth string `toml:"product_auth" json:"-"`
	SessionAuth string `toml:"session_auth" json:"-"`

	ProxyDataCenter      string         `toml:"proxy_datacenter" json:"proxy_datacenter"`
	ProxyMaxClients      int            `toml:"proxy_max_clients" json:"proxy_max_clients"`
	ProxyMaxOffheapBytes bytesize.Int64 `toml:"proxy_max_offheap_size" json:"proxy_max_offheap_size"`
	ProxyHeapPlaceholder bytesize.Int64 `toml:"proxy_heap_placeholder" json:"proxy_heap_placeholder"`

	BackendPingPeriod      timesize.Duration `toml:"backend_ping_period" json:"backend_ping_period"`
	BackendRecvBufsize     bytesize.Int64    `toml:"backend_recv_bufsize" json:"backend_recv_bufsize"`
	BackendRecvTimeout     timesize.Duration `toml:"backend_recv_timeout" json:"backend_recv_timeout"`
	BackendSendBufsize     bytesize.Int64    `toml:"backend_send_bufsize" json:"backend_send_bufsize"`
	BackendSendTimeout     timesize.Duration `toml:"backend_send_timeout" json:"backend_send_timeout"`
	BackendMaxPipeline     int               `toml:"backend_max_pipeline" json:"backend_max_pipeline"`
	BackendPrimaryOnly     bool              `toml:"backend_primary_only" json:"backend_primary_only"`
	BackendPrimaryParallel int               `toml:"backend_primary_parallel" json:"backend_primary_parallel"`
	BackendReplicaParallel int               `toml:"backend_replica_parallel" json:"backend_replica_parallel"`
	BackendKeepAlivePeriod timesize.Duration `toml:"backend_keepalive_period" json:"backend_keepalive_period"`
	BackendNumberDatabases int32             `toml:"backend_number_databases" json:"backend_number_databases"`

	SessionRecvBufsize     bytesize.Int64    `toml:"session_recv_bufsize" json:"session_recv_bufsize"`
	SessionRecvTimeout     timesize.Duration `toml:"session_recv_timeout" json:"session_recv_timeout"`
	SessionSendBufsize     bytesize.Int64    `toml:"session_send_bufsize" json:"session_send_bufsize"`
	SessionSendTimeout     timesize.Duration `toml:"session_send_timeout" json:"session_send_timeout"`
	SessionMaxPipeline     int               `toml:"session_max_pipeline" json:"session_max_pipeline"`
	SessionKeepAlivePeriod timesize.Duration `toml:"session_keepalive_period" json:"session_keepalive_period"`
	SessionBreakOnFailure  bool              `toml:"session_break_on_failure" json:"session_break_on_failure"`

	MetricsReportServer           string            `toml:"metrics_report_server" json:"metrics_report_server"`
	MetricsReportPeriod           timesize.Duration `toml:"metrics_report_period" json:"metrics_report_period"`
	MetricsReportInfluxdbServer   string            `toml:"metrics_report_influxdb_server" json:"metrics_report_influxdb_server"`
	MetricsReportInfluxdbPeriod   timesize.Duration `toml:"metrics_report_influxdb_period" json:"metrics_report_influxdb_period"`
	MetricsReportInfluxdbUsername string            `toml:"metrics_report_influxdb_username" json:"metrics_report_influxdb_username"`
	MetricsReportInfluxdbPassword string            `toml:"metrics_report_influxdb_password" json:"-"`
	MetricsReportInfluxdbDatabase string            `toml:"metrics_report_influxdb_database" json:"metrics_report_influxdb_database"`
	MetricsReportStatsdServer     string            `toml:"metrics_report_statsd_server" json:"metrics_report_statsd_server"`
	MetricsReportStatsdPeriod     timesize.Duration `toml:"metrics_report_statsd_period" json:"metrics_report_statsd_period"`
	MetricsReportStatsdPrefix     string            `toml:"metrics_report_statsd_prefix" json:"metrics_report_statsd_prefix"`
}

func NewDefaultConfig

func NewDefaultConfig() *Config

func (*Config) LoadFromFile

func (c *Config) LoadFromFile(path string) error

func (*Config) String

func (c *Config) String() string

func (*Config) Validate

func (c *Config) Validate() error

type Delay

type Delay interface {
	Reset()
	After() <-chan time.Time
	Sleep()
	SleepWithCancel(canceled func() bool)
}

type DelayExp2

type DelayExp2 struct {
	Min, Max int
	Value    int
	Unit     time.Duration
}

func (*DelayExp2) After

func (d *DelayExp2) After() <-chan time.Time

func (*DelayExp2) NextValue

func (d *DelayExp2) NextValue() int

func (*DelayExp2) Reset

func (d *DelayExp2) Reset()

func (*DelayExp2) Sleep

func (d *DelayExp2) Sleep()

func (*DelayExp2) SleepWithCancel

func (d *DelayExp2) SleepWithCancel(canceled func() bool)

type Jodis

type Jodis struct {
	// contains filtered or unexported fields
}

func NewJodis

func NewJodis(c models.Client, p *models.Proxy) *Jodis

func (*Jodis) Close

func (j *Jodis) Close() error

func (*Jodis) Data

func (j *Jodis) Data() string

func (*Jodis) IsClosed

func (j *Jodis) IsClosed() bool

func (*Jodis) IsWatching

func (j *Jodis) IsWatching() bool

func (*Jodis) Path

func (j *Jodis) Path() string

func (*Jodis) Rewatch

func (j *Jodis) Rewatch() (<-chan struct{}, error)

func (*Jodis) Start

func (j *Jodis) Start()

type OpFlag

type OpFlag uint32

func (OpFlag) IsMasterOnly

func (f OpFlag) IsMasterOnly() bool

func (OpFlag) IsNotAllowed

func (f OpFlag) IsNotAllowed() bool

func (OpFlag) IsReadOnly

func (f OpFlag) IsReadOnly() bool

type OpInfo

type OpInfo struct {
	Name string
	Flag OpFlag
}

type OpStats

type OpStats struct {
	OpStr        string `json:"opstr"`
	Calls        int64  `json:"calls"`
	Usecs        int64  `json:"usecs"`
	UsecsPercall int64  `json:"usecs_percall"`
	Fails        int64  `json:"fails"`
	RedisErrType int64  `json:"redis_errtype"`
}

func GetOpStatsAll

func GetOpStatsAll() []*OpStats

type Overview

type Overview struct {
	Version string         `json:"version"`
	Compile string         `json:"compile"`
	Config  *Config        `json:"config,omitempty"`
	Model   *models.Proxy  `json:"model,omitempty"`
	Stats   *Stats         `json:"stats,omitempty"`
	Slots   []*models.Slot `json:"slots,omitempty"`
}

type Proxy

type Proxy struct {
	// contains filtered or unexported fields
}

func New

func New(config *Config) (*Proxy, error)

func (*Proxy) Close

func (s *Proxy) Close() error

func (*Proxy) Config

func (s *Proxy) Config() *Config

func (*Proxy) FillSlot

func (s *Proxy) FillSlot(m *models.Slot) error

func (*Proxy) FillSlots

func (s *Proxy) FillSlots(slots []*models.Slot) error

func (*Proxy) GetSentinels

func (s *Proxy) GetSentinels() ([]string, map[int]string)

func (*Proxy) HasSwitched

func (s *Proxy) HasSwitched() bool

func (*Proxy) IsClosed

func (s *Proxy) IsClosed() bool

func (*Proxy) IsOnline

func (s *Proxy) IsOnline() bool

func (*Proxy) Model

func (s *Proxy) Model() *models.Proxy

func (*Proxy) Overview

func (s *Proxy) Overview(flags StatsFlags) *Overview

func (*Proxy) RewatchSentinels

func (s *Proxy) RewatchSentinels() error

func (*Proxy) SetSentinels

func (s *Proxy) SetSentinels(servers []string) error

func (*Proxy) Slots

func (s *Proxy) Slots() []*models.Slot

func (*Proxy) Start

func (s *Proxy) Start() error

func (*Proxy) Stats

func (s *Proxy) Stats(flags StatsFlags) *Stats

func (*Proxy) SwitchMasters

func (s *Proxy) SwitchMasters(masters map[int]string) error

func (*Proxy) XAuth

func (s *Proxy) XAuth() string

type Request

type Request struct {
	Multi []*redis.Resp
	Batch *sync.WaitGroup
	Group *sync.WaitGroup

	Broken *atomic2.Bool

	OpStr string
	OpFlag

	Database int32
	UnixNano int64

	*redis.Resp
	Err error

	Coalesce func() error
}

func (*Request) IsBroken

func (r *Request) IsBroken() bool

func (*Request) MakeSubRequest

func (r *Request) MakeSubRequest(n int) []Request

func (*Request) Seed16

func (r *Request) Seed16() uint

type RequestChan

type RequestChan struct {
	// contains filtered or unexported fields
}

func NewRequestChan

func NewRequestChan() *RequestChan

func NewRequestChanBuffer

func NewRequestChanBuffer(n int) *RequestChan

func (*RequestChan) Buffered

func (c *RequestChan) Buffered() int

func (*RequestChan) Close

func (c *RequestChan) Close()

func (*RequestChan) IsEmpty

func (c *RequestChan) IsEmpty() bool

func (*RequestChan) PopFront

func (c *RequestChan) PopFront() (*Request, bool)

func (*RequestChan) PopFrontAll

func (c *RequestChan) PopFrontAll(onRequest func(r *Request) error) error

func (*RequestChan) PopFrontAllVoid

func (c *RequestChan) PopFrontAllVoid(onRequest func(r *Request))

func (*RequestChan) PushBack

func (c *RequestChan) PushBack(r *Request) int

type Router

type Router struct {
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(config *Config) *Router

func (*Router) Close

func (s *Router) Close()

func (*Router) FillSlot

func (s *Router) FillSlot(m *models.Slot) error

func (*Router) GetSlot

func (s *Router) GetSlot(id int) *models.Slot

func (*Router) GetSlots

func (s *Router) GetSlots() []*models.Slot

func (*Router) HasSwitched

func (s *Router) HasSwitched() bool

func (*Router) KeepAlive

func (s *Router) KeepAlive() error

func (*Router) Start

func (s *Router) Start()

func (*Router) SwitchMasters

func (s *Router) SwitchMasters(masters map[int]string) error

type RuntimeStats

type RuntimeStats struct {
	General struct {
		Alloc   uint64 `json:"alloc"`
		Sys     uint64 `json:"sys"`
		Lookups uint64 `json:"lookups"`
		Mallocs uint64 `json:"mallocs"`
		Frees   uint64 `json:"frees"`
	} `json:"general"`

	Heap struct {
		Alloc   uint64 `json:"alloc"`
		Sys     uint64 `json:"sys"`
		Idle    uint64 `json:"idle"`
		Inuse   uint64 `json:"inuse"`
		Objects uint64 `json:"objects"`
	} `json:"heap"`

	GC struct {
		Num          uint32  `json:"num"`
		CPUFraction  float64 `json:"cpu_fraction"`
		TotalPauseMs uint64  `json:"total_pausems"`
	} `json:"gc"`

	NumProcs      int   `json:"num_procs"`
	NumGoroutines int   `json:"num_goroutines"`
	NumCgoCall    int64 `json:"num_cgo_call"`
	MemOffheap    int64 `json:"mem_offheap"`
}

type Session

type Session struct {
	Conn *redis.Conn

	Ops int64

	CreateUnix int64
	LastOpUnix int64
	// contains filtered or unexported fields
}

func NewSession

func NewSession(sock net.Conn, config *Config) *Session

func (*Session) CloseReaderWithError

func (s *Session) CloseReaderWithError(err error) error

func (*Session) CloseWithError

func (s *Session) CloseWithError(err error) error

func (*Session) Start

func (s *Session) Start(d *Router)

func (*Session) String

func (s *Session) String() string

type Slot

type Slot struct {
	// contains filtered or unexported fields
}

type Stats

type Stats struct {
	Online bool `json:"online"`
	Closed bool `json:"closed"`

	Sentinels struct {
		Servers  []string          `json:"servers,omitempty"`
		Masters  map[string]string `json:"masters,omitempty"`
		Switched bool              `json:"switched,omitempty"`
	} `json:"sentinels"`

	Ops struct {
		Total int64 `json:"total"`
		Fails int64 `json:"fails"`
		Redis struct {
			Errors int64 `json:"errors"`
		} `json:"redis"`
		QPS int64      `json:"qps"`
		Cmd []*OpStats `json:"cmd,omitempty"`
	} `json:"ops"`

	Sessions struct {
		Total int64 `json:"total"`
		Alive int64 `json:"alive"`
	} `json:"sessions"`

	Rusage struct {
		Now string       `json:"now"`
		CPU float64      `json:"cpu"`
		Mem int64        `json:"mem"`
		Raw *utils.Usage `json:"raw,omitempty"`
	} `json:"rusage"`

	Backend struct {
		PrimaryOnly bool `json:"primary_only"`
	} `json:"backend"`

	Runtime *RuntimeStats `json:"runtime,omitempty"`
}

type StatsFlags

type StatsFlags uint32

func (StatsFlags) HasBit

func (s StatsFlags) HasBit(m StatsFlags) bool

type SysUsage

type SysUsage struct {
	Now time.Time
	CPU float64
	*utils.Usage
}

func GetSysUsage

func GetSysUsage() *SysUsage

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL