client

package
v3.2.4+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2020 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	XVersion           = "X-RPCX-Version"
	XMessageType       = "X-RPCX-MesssageType"
	XHeartbeat         = "X-RPCX-Heartbeat"
	XOneway            = "X-RPCX-Oneway"
	XMessageStatusType = "X-RPCX-MessageStatusType"
	XSerializeType     = "X-RPCX-SerializeType"
	XMessageID         = "X-RPCX-MessageID"
	XServicePath       = "X-RPCX-ServicePath"
	XServiceMethod     = "X-RPCX-ServiceMethod"
	XMeta              = "X-RPCX-Meta"
	XErrorMessage      = "X-RPCX-ErrorMessage"
)
View Source
const (
	// ReaderBuffsize is used for bufio reader.
	ReaderBuffsize = 16 * 1024
	// WriterBuffsize is used for bufio writer.
	WriterBuffsize = 16 * 1024
)

Variables

View Source
var (
	ErrBreakerOpen    = errors.New("breaker open")
	ErrBreakerTimeout = errors.New("breaker time out")
)
View Source
var (
	ErrShutdown         = errors.New("connection is shut down")
	ErrUnsupportedCodec = errors.New("unsupported codec")
)

ErrShutdown connection is closed.

View Source
var (
	// ErrXClientShutdown xclient is shutdown.
	ErrXClientShutdown = errors.New("xClient is shut down")
	// ErrXClientNoServer selector can't found one server.
	ErrXClientNoServer = errors.New("can not found any server")
	// ErrServerUnavailable selected server is unavailable.
	ErrServerUnavailable = errors.New("selected server is unavilable")
)
View Source
var DefaultOption = Option{
	Retries:        3,
	RPCPath:        share.DefaultRPCPath,
	ConnectTimeout: 10 * time.Second,
	SerializeType:  protocol.MsgPack,
	CompressType:   protocol.None,
	BackupLatency:  10 * time.Millisecond,
}

DefaultOption is a common option configuration for client.

View Source
var InprocessClient = &inprocessClient{
	services: make(map[string]interface{}),
	methods:  make(map[string]*reflect.Value),
}

InprocessClient is a in-process client for test.

Functions

func Hash

func Hash(key uint64, buckets int32) int32

Hash consistently chooses a hash bucket number in the range [0, numBuckets) for the given key. numBuckets must be >= 1.

func HashString

func HashString(s string) uint64

HashString get a hash value of a string

func JumpConsistentHash

func JumpConsistentHash(len int, options ...interface{}) int

JumpConsistentHash selects a server by serviceMethod and args

Types

type Breaker

type Breaker interface {
	Call(func() error, time.Duration) error
}

Breaker is a CircuitBreaker interface.

var CircuitBreaker Breaker = circuit.NewRateBreaker(0.95, 100)

CircuitBreaker is a default circuit breaker (RateBreaker(0.95, 100)).

type Call

type Call struct {
	ServicePath   string            // The name of the service and method to call.
	ServiceMethod string            // The name of the service and method to call.
	Metadata      map[string]string //metadata
	ResMetadata   map[string]string
	Args          interface{} // The argument to the function (*struct).
	Reply         interface{} // The reply from the function (*struct).
	Error         error       // After completion, the error status.
	Done          chan *Call  // Strobes when call is complete.
	Raw           bool        // raw message or not
}

Call represents an active RPC.

type Client

type Client struct {
	Conn net.Conn

	Plugins PluginContainer

	ServerMessageChan chan<- *protocol.Message
	// contains filtered or unexported fields
}

Client represents a RPC client.

func NewClient

func NewClient(option Option) *Client

NewClient returns a new Client with the option.

func (*Client) Call

func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error

Call invokes the named function, waits for it to complete, and returns its error status.

func (*Client) Close

func (client *Client) Close() error

Close calls the underlying codec's Close method. If the connection is already shutting down, ErrShutdown is returned.

func (*Client) Connect

func (c *Client) Connect(network, address string) error

Connect connects the server via specified network.

func (*Client) Go

func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call

Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.

func (*Client) IsClosing

func (client *Client) IsClosing() bool

IsClosing client is closing or not.

func (*Client) IsShutdown

func (client *Client) IsShutdown() bool

IsShutdown client is shutdown or not.

func (*Client) RegisterServerMessageChan

func (client *Client) RegisterServerMessageChan(ch chan<- *protocol.Message)

RegisterServerMessageChan registers the channel that receives server requests.

func (*Client) SendRaw

func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)

SendRaw sends raw messages. You don't care args and replys.

func (*Client) UnregisterServerMessageChan

func (client *Client) UnregisterServerMessageChan()

UnregisterServerMessageChan removes ServerMessageChan.

type ConsecCircuitBreaker

type ConsecCircuitBreaker struct {
	// contains filtered or unexported fields
}

ConsecCircuitBreaker is window sliding CircuitBreaker with failure threshold.

func NewConsecCircuitBreaker

func NewConsecCircuitBreaker(failureThreshold uint64, window time.Duration) *ConsecCircuitBreaker

NewConsecCircuitBreaker returns a new ConsecCircuitBreaker.

func (*ConsecCircuitBreaker) Call

func (cb *ConsecCircuitBreaker) Call(fn func() error, d time.Duration) error

Call Circuit function

type ConsistentAddrStrFunction

type ConsistentAddrStrFunction func(options ...interface{}) string

ConsistentFunction define a hash function Return service address, like "tcp@127.0.0.1:8970"

type EtcdDiscovery

type EtcdDiscovery struct {

	// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
	RetriesAfterWatchFailed int
	// contains filtered or unexported fields
}

EtcdDiscovery is a etcd service discovery. It always returns the registered servers in etcd.

func (EtcdDiscovery) Clone

func (d EtcdDiscovery) Clone(servicePath string) ServiceDiscovery

Clone clones this ServiceDiscovery with new servicePath.

func (*EtcdDiscovery) Close

func (d *EtcdDiscovery) Close()

func (EtcdDiscovery) GetServices

func (d EtcdDiscovery) GetServices() []*KVPair

GetServices returns the servers

func (*EtcdDiscovery) RemoveWatcher

func (d *EtcdDiscovery) RemoveWatcher(ch chan []*KVPair)

func (*EtcdDiscovery) WatchService

func (d *EtcdDiscovery) WatchService() chan []*KVPair

WatchService returns a nil chan.

type FailMode

type FailMode int

FailMode decides how clients action when clients fail to invoke services

const (
	//Failover selects another server automaticaly
	Failover FailMode = iota
	//Failfast returns error immediately
	Failfast
	//Failtry use current client again
	Failtry
	//Failbackup select another server if the first server doesn't respon in specified time and use the fast response.
	Failbackup
)

type HashServiceAndArgs

type HashServiceAndArgs func(len int, options ...interface{}) int

HashServiceAndArgs define a hash function

type InprocessDiscovery

type InprocessDiscovery struct {
}

InprocessDiscovery is a in-process service discovery. Clients and servers are in one process and communicate without tcp/udp.

func (InprocessDiscovery) Clone

func (d InprocessDiscovery) Clone(servicePath string) ServiceDiscovery

Clone clones this ServiceDiscovery with new servicePath.

func (*InprocessDiscovery) Close

func (d *InprocessDiscovery) Close()

func (InprocessDiscovery) GetServices

func (d InprocessDiscovery) GetServices() []*KVPair

GetServices returns the static server

func (InprocessDiscovery) RemoveWatcher

func (d InprocessDiscovery) RemoveWatcher(ch chan []*KVPair)

func (InprocessDiscovery) WatchService

func (d InprocessDiscovery) WatchService() chan []*KVPair

WatchService returns a nil chan.

type KVPair

type KVPair struct {
	Key   string
	Value string
}

KVPair contains a key and a string.

type MDNSDiscovery

type MDNSDiscovery struct {
	Timeout       time.Duration
	WatchInterval time.Duration
	// contains filtered or unexported fields
}

MDNSDiscovery is a mdns service discovery. It always returns the registered servers in etcd.

func (MDNSDiscovery) Clone

func (d MDNSDiscovery) Clone(servicePath string) ServiceDiscovery

Clone clones this ServiceDiscovery with new servicePath.

func (*MDNSDiscovery) Close

func (d *MDNSDiscovery) Close()

func (MDNSDiscovery) GetServices

func (d MDNSDiscovery) GetServices() []*KVPair

GetServices returns the servers

func (*MDNSDiscovery) RemoveWatcher

func (d *MDNSDiscovery) RemoveWatcher(ch chan []*KVPair)

func (*MDNSDiscovery) WatchService

func (d *MDNSDiscovery) WatchService() chan []*KVPair

WatchService returns a nil chan.

type MultipleServersDiscovery

type MultipleServersDiscovery struct {
	// contains filtered or unexported fields
}

MultipleServersDiscovery is a multiple servers service discovery. It always returns the current servers and uses can change servers dynamically.

func (MultipleServersDiscovery) Clone

func (d MultipleServersDiscovery) Clone(servicePath string) ServiceDiscovery

Clone clones this ServiceDiscovery with new servicePath.

func (*MultipleServersDiscovery) Close

func (d *MultipleServersDiscovery) Close()

func (MultipleServersDiscovery) GetServices

func (d MultipleServersDiscovery) GetServices() []*KVPair

GetServices returns the configured server

func (*MultipleServersDiscovery) RemoveWatcher

func (d *MultipleServersDiscovery) RemoveWatcher(ch chan []*KVPair)

func (*MultipleServersDiscovery) Update

func (d *MultipleServersDiscovery) Update(pairs []*KVPair)

Update is used to update servers at runtime.

func (*MultipleServersDiscovery) WatchService

func (d *MultipleServersDiscovery) WatchService() chan []*KVPair

WatchService returns a nil chan.

type Option

type Option struct {
	// Group is used to select the services in the same group. Services set group info in their meta.
	// If it is empty, clients will ignore group.
	Group string

	// Retries retries to send
	Retries int

	// TLSConfig for tcp and quic
	TLSConfig *tls.Config
	// kcp.BlockCrypt
	Block interface{}
	// RPCPath for http connection
	RPCPath string
	//ConnectTimeout sets timeout for dialing
	ConnectTimeout time.Duration
	// ReadTimeout sets readdeadline for underlying net.Conns
	ReadTimeout time.Duration
	// WriteTimeout sets writedeadline for underlying net.Conns
	WriteTimeout time.Duration

	// BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time.
	BackupLatency time.Duration

	// Breaker is used to config CircuitBreaker
	Breaker Breaker

	SerializeType protocol.SerializeType
	CompressType  protocol.CompressType

	Heartbeat         bool
	HeartbeatInterval time.Duration
}

Option contains all options for creating clients.

type Peer2PeerDiscovery

type Peer2PeerDiscovery struct {
	// contains filtered or unexported fields
}

Peer2PeerDiscovery is a peer-to-peer service discovery. It always returns the static server.

func (Peer2PeerDiscovery) Clone

func (d Peer2PeerDiscovery) Clone(servicePath string) ServiceDiscovery

Clone clones this ServiceDiscovery with new servicePath.

func (*Peer2PeerDiscovery) Close

func (d *Peer2PeerDiscovery) Close()

func (Peer2PeerDiscovery) GetServices

func (d Peer2PeerDiscovery) GetServices() []*KVPair

GetServices returns the static server

func (*Peer2PeerDiscovery) RemoveWatcher

func (d *Peer2PeerDiscovery) RemoveWatcher(ch chan []*KVPair)

func (Peer2PeerDiscovery) WatchService

func (d Peer2PeerDiscovery) WatchService() chan []*KVPair

WatchService returns a nil chan.

type Plugin

type Plugin interface {
}

Plugin is the client plugin interface.

type PluginContainer

type PluginContainer interface {
	Add(plugin Plugin)
	Remove(plugin Plugin)
	All() []Plugin

	DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error
	DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
}

PluginContainer represents a plugin container that defines all methods to manage plugins. And it also defines all extension points.

type PostCallPlugin

type PostCallPlugin interface {
	DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
}

PostCallPlugin is invoked after the client calls a server.

type PreCallPlugin

type PreCallPlugin interface {
	DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error
}

PreCallPlugin is invoked before the client calls a server.

type RPCClient

type RPCClient interface {
	Connect(network, address string) error
	Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
	Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
	SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
	Close() error

	RegisterServerMessageChan(ch chan<- *protocol.Message)
	UnregisterServerMessageChan()

	IsClosing() bool
	IsShutdown() bool
}

RPCClient is interface that defines one client to call one server.

type SelectMode

type SelectMode int

SelectMode defines the algorithm of selecting a services from candidates.

const (
	//RandomSelect is selecting randomly
	RandomSelect SelectMode = iota
	//RoundRobin is selecting by round robin
	RoundRobin
	//WeightedRoundRobin is selecting by weighted round robin
	WeightedRoundRobin
	//WeightedICMP is selecting by weighted Ping time
	WeightedICMP
	//ConsistentHash is selecting by hashing
	ConsistentHash
	//Closest is selecting the closest server
	Closest

	// SelectByUser is selecting by implementation of users
	SelectByUser = 1000
)

func (SelectMode) String

func (s SelectMode) String() string

type Selector

type Selector interface {
	Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string
	UpdateServer(servers map[string]string)
}

Selector defines selector that selects one service from candidates.

type ServiceDiscovery

type ServiceDiscovery interface {
	GetServices() []*KVPair
	WatchService() chan []*KVPair
	RemoveWatcher(ch chan []*KVPair)
	Clone(servicePath string) ServiceDiscovery
	Close()
}

ServiceDiscovery defines ServiceDiscovery of zookeeper, etcd and consul

func NewEtcdDiscovery

func NewEtcdDiscovery(basePath string, servicePath string, etcdAddr []string, options *store.Config) ServiceDiscovery

NewEtcdDiscovery returns a new EtcdDiscovery.

func NewEtcdDiscoveryStore

func NewEtcdDiscoveryStore(basePath string, kv store.Store) ServiceDiscovery

NewEtcdDiscoveryStore return a new EtcdDiscovery with specified store.

func NewEtcdDiscoveryTemplate

func NewEtcdDiscoveryTemplate(basePath string, etcdAddr []string, options *store.Config) ServiceDiscovery

NewEtcdDiscoveryTemplate returns a new EtcdDiscovery template.

func NewInprocessDiscovery

func NewInprocessDiscovery() ServiceDiscovery

NewInprocessDiscovery returns a new InprocessDiscovery.

func NewMDNSDiscovery

func NewMDNSDiscovery(service string, timeout time.Duration, watchInterval time.Duration, domain string) ServiceDiscovery

NewMDNSDiscovery returns a new MDNSDiscovery. If domain is empty, use "local." in default.

func NewMDNSDiscoveryTemplate

func NewMDNSDiscoveryTemplate(timeout time.Duration, watchInterval time.Duration, domain string) ServiceDiscovery

NewMDNSDiscoveryTemplate returns a new MDNSDiscovery template.

func NewMultipleServersDiscovery

func NewMultipleServersDiscovery(pairs []*KVPair) ServiceDiscovery

NewMultipleServersDiscovery returns a new MultipleServersDiscovery.

func NewPeer2PeerDiscovery

func NewPeer2PeerDiscovery(server, metadata string) ServiceDiscovery

NewPeer2PeerDiscovery returns a new Peer2PeerDiscovery.

func NewStaticDiscovery

func NewStaticDiscovery(basePath, servicePath, configPath string) ServiceDiscovery

NewStaticDiscovery returns a new StaticDiscovery.

func NewStaticDiscoveryTemplate

func NewStaticDiscoveryTemplate(basePath, configPath string) ServiceDiscovery

NewStaticDiscoveryTemplate returns a new StaticDiscovery template.

type ServiceError

type ServiceError string

ServiceError is an error from server.

func (ServiceError) Error

func (e ServiceError) Error() string

type StaticDiscovery

type StaticDiscovery struct {
	// contains filtered or unexported fields
}

StaticDiscovery is a static service discovery. It always returns the registered servers in static yaml file.

func (*StaticDiscovery) Clone

func (d *StaticDiscovery) Clone(servicePath string) ServiceDiscovery

Clone clones this ServiceDiscovery with new servicePath.

func (*StaticDiscovery) Close

func (d *StaticDiscovery) Close()

func (StaticDiscovery) GetServices

func (d StaticDiscovery) GetServices() []*KVPair

GetServices returns the servers

func (*StaticDiscovery) RemoveWatcher

func (d *StaticDiscovery) RemoveWatcher(ch chan []*KVPair)

func (*StaticDiscovery) WatchService

func (d *StaticDiscovery) WatchService() chan []*KVPair

WatchService returns a nil chan.

type Weighted

type Weighted struct {
	Server          string
	Weight          int
	CurrentWeight   int
	EffectiveWeight int
}

Weighted is a wrapped server with weight

type XClient

type XClient interface {
	SetPlugins(plugins PluginContainer)
	SetSelector(s Selector)
	ConfigGeoSelector(latitude, longitude float64)
	Auth(auth string)

	Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
	Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
	SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
	Close() error
}

XClient is an interface that used by client with service discovery and service governance. One XClient is used only for one service. You should create multiple XClient for multiple services.

func NewBidirectionalXClient

func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) XClient

NewBidirectionalXClient creates a new xclient that can receive notifications from servers.

func NewXClient

func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient

NewXClient creates a XClient that supports service discovery and service governance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL