xing

package module
v0.0.0-...-433885b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2021 License: Apache-2.0 Imports: 18 Imported by: 0

README

About

Designed to be pronounced like "cross-ing", this is currently a narrow-minded RPC library that helps building a particular style of microservices.

Essentially, it is RabbitMQ + protobuf with some pre-defined rules on message topics.

Credit

This started from an opioninated usage of the amqp library, as such, it took references from the following fine projects, especially cony:

  • cony
    • wrapper around ampq in declarative style
  • coworkers
    • A RabbitMQ Microservice Framework in Node.js
  • relay
    • Golang framework for simple message passing using an AMQP broker

Later on, somebody suggested that xing should provide higher level functionalities like what gRPC does. Initially I was against the idea because 1) I don't really like code generators 2) I don't really like hiding the fact that a seemingly simple function call is actually a remote call. But I added the functionality anyway, again, taking references from the following fine projects, especially go-micro:

  • go-micro
    • a feature rich, flexible microservice framework that has very good abstraction. the companion protobuf compiler plugin for xing was a fork from go-micro; the WIP sercice registration builds directly on top of the registry package.
  • go-kit
    • A standard library for microservices.
  • rpcx
    • A RPC service framework based on net/rpc like alibaba Dubbo and weibo Motan. One of best performance RPC frameworks.

Documentation

Index

Constants

View Source
const (
	// Type
	Command = "command"
	Event   = "event"
	Result  = "result"

	// Event
	Register = "Register"

	// Exchanges
	RPCExchange   = "xing.rpc"
	EventExchange = "xing.event"

	// Client Types
	ProducerClient      = "producer"
	ServiceClient       = "service"
	EventHandlerClient  = "event_handler"
	StreamHandlerClient = "stream_handler"

	// Defaults
	RPCTTL         = int64(1)
	EVTTTL         = int64(15 * 60 * 1000)     // 15 minutes
	STRMTTL        = int64(60 * 1000)          // 1 minutes
	ResultQueueTTL = int64(10 * 60 * 1000)     // 10 minutes
	QueueTTL       = int64(3 * 60 * 60 * 1000) // 3 hours

	// Threshold
	MinHeatbeat = 3

	// Threading
	PoolSize = 1000
	NWorker  = 5
)

Constants

Variables

This section is empty.

Functions

func Methods

func Methods(service string, v interface{}) (map[string]reflect.Value, map[string]reflect.Type, map[string]reflect.Type)

Methods ...

Types

type CallOption

type CallOption struct {
}

CallOption ...

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Client is a wrapper struct for amqp client.

func NewClient

func NewClient(name string, url string, opts ...ClientOpt) (*Client, error)

NewClient creates a RPC/event client. FIXME: should restrict client name to be 3 segments

func NewEventHandler

func NewEventHandler(name string, url string, opts ...ClientOpt) (*Client, error)

NewEventHandler creates a new event handler.

func NewService

func NewService(name string, url string, opts ...ClientOpt) (*Client, error)

NewService creates a RPC service. If the name has only 2 segments, different service instances are balanced.

func NewStreamHandler

func NewStreamHandler(name string, url string, opts ...ClientOpt) (*Client, error)

NewStreamHandler creates a new data stream handler.

func (*Client) Call

func (c *Client) Call(ctx context.Context, target string, method string, payload interface{}, sync bool) (interface{}, error)

Call invokes an remote method. Should not be called externally.

func (*Client) Close

func (c *Client) Close()

Close shuts down a client or server.

func (*Client) NewHandler

func (c *Client) NewHandler(service string, v interface{})

NewHandler registers handler of protocol. Called from generated code.

func (*Client) Notify

func (c *Client) Notify(target string, event string, payload interface{}) error

Notify sends an event.

func (*Client) Register

func (c *Client) Register(address string, port int, tags map[string]string, ttl time.Duration) error

Register starts periodic registration to a database such as etcd or consul.

func (*Client) Respond

func (c *Client) Respond(delivery amqp.Delivery, command string, payload interface{}) error

Respond called by RPC server. Should not be called externally.

func (*Client) Run

func (c *Client) Run() error

Run starts a server with default context

func (*Client) RunWithContext

func (c *Client) RunWithContext(ctx context.Context) error

RunWithContext starts a RPC/event server. This is a blocking call.

type ClientOpt

type ClientOpt func(*Client)

ClientOpt ...

func SetBrokerTimeout

func SetBrokerTimeout(count, interval int) ClientOpt

SetBrokerTimeout set a timeout for a server to limit broker reconnect attempts. count: number of retries; interval: number of seconds between retries

func SetHealthChecker

func SetHealthChecker(hc HealthChecker) ClientOpt

SetHealthChecker ...

func SetIdentifier

func SetIdentifier(id Identifier) ClientOpt

SetIdentifier sets identifer of the instance. Default is random string

func SetInterets

func SetInterets(topic ...string) ClientOpt

SetInterets subscribes to events. Must be called for event handler.

func SetRegistrator

func SetRegistrator(reg Registrator) ClientOpt

SetRegistrator ...

func SetSerializer

func SetSerializer(ser Serializer) ClientOpt

SetSerializer sets data serializer. Default is protobuf.

func SetTLSConfig

func SetTLSConfig(cfg *tls.Config) ClientOpt

SetTLSConfig configures TLS connection to a server.

type DumbHealthChecker

type DumbHealthChecker struct {
}

DumbHealthChecker ...

func (*DumbHealthChecker) Healthy

func (dhc *DumbHealthChecker) Healthy() bool

Healthy ...

type HealthChecker

type HealthChecker interface {
	Healthy() bool
}

HealthChecker ...

func DefaultHealthChecker

func DefaultHealthChecker() HealthChecker

DefaultHealthChecker ...

type Identifier

type Identifier interface {
	InstanceID() string
}

Identifier ...

type InstanceSelector

type InstanceSelector struct {
	// contains filtered or unexported fields
}

InstanceSelector ...

func (*InstanceSelector) Select

func (is *InstanceSelector) Select(services []*Service) *Service

Select ...

type JSONSerializer

type JSONSerializer struct {
	// contains filtered or unexported fields
}

JSONSerializer ...

func (*JSONSerializer) ContentType

func (s *JSONSerializer) ContentType() string

ContentType ...

func (*JSONSerializer) DefaultValue

func (s *JSONSerializer) DefaultValue() interface{}

DefaultValue ...

func (*JSONSerializer) Marshal

func (s *JSONSerializer) Marshal(data interface{}) ([]byte, error)

Marshal ...

func (*JSONSerializer) Unmarshal

func (s *JSONSerializer) Unmarshal(data []byte, v interface{}) error

Unmarshal ...

type NodeIdentifier

type NodeIdentifier struct {
}

NodeIdentifier ...

func (*NodeIdentifier) InstanceID

func (p *NodeIdentifier) InstanceID() string

InstanceID ...

type NoneIdentifier

type NoneIdentifier struct {
}

NoneIdentifier ...

func (*NoneIdentifier) InstanceID

func (p *NoneIdentifier) InstanceID() string

InstanceID ...

type PlainSerializer

type PlainSerializer struct {
	// contains filtered or unexported fields
}

PlainSerializer ...

func (*PlainSerializer) ContentType

func (s *PlainSerializer) ContentType() string

ContentType ...

func (*PlainSerializer) DefaultValue

func (s *PlainSerializer) DefaultValue() interface{}

DefaultValue ...

func (*PlainSerializer) Marshal

func (s *PlainSerializer) Marshal(data interface{}) ([]byte, error)

Marshal ...

func (*PlainSerializer) Unmarshal

func (s *PlainSerializer) Unmarshal(data []byte, v interface{}) error

Unmarshal ...

type PodIdentifier

type PodIdentifier struct {
}

PodIdentifier ...

func (*PodIdentifier) InstanceID

func (p *PodIdentifier) InstanceID() string

InstanceID ...

type ProtoSerializer

type ProtoSerializer struct {
	// contains filtered or unexported fields
}

ProtoSerializer ...

func (*ProtoSerializer) ContentType

func (s *ProtoSerializer) ContentType() string

ContentType ...

func (*ProtoSerializer) DefaultValue

func (s *ProtoSerializer) DefaultValue() interface{}

DefaultValue ...

func (*ProtoSerializer) Marshal

func (s *ProtoSerializer) Marshal(data interface{}) ([]byte, error)

Marshal ...

func (*ProtoSerializer) Unmarshal

func (s *ProtoSerializer) Unmarshal(data []byte, v interface{}) error

Unmarshal ...

type RandomHealthChecker

type RandomHealthChecker struct {
}

RandomHealthChecker ...

func (*RandomHealthChecker) Healthy

func (rhc *RandomHealthChecker) Healthy() bool

Healthy ...

type RandomIdentifier

type RandomIdentifier struct {
}

RandomIdentifier ...

func (*RandomIdentifier) InstanceID

func (p *RandomIdentifier) InstanceID() string

InstanceID ...

type Registrator

type Registrator interface {
	Register(s *Service, ttl time.Duration) error
	Deregister(s *Service) error
	GetService(name string, selector Selector) (*Service, error)
}

Registrator ...

func NewConsulRegistrator

func NewConsulRegistrator(opts ...registry.Option) Registrator

NewConsulRegistrator ...

func NewEtcdRegistrator

func NewEtcdRegistrator(opts ...registry.Option) Registrator

NewEtcdRegistrator ...

type Request

type Request struct {
	// contains filtered or unexported fields
}

Request ...

type Selector

type Selector interface {
	Select(s []*Service) *Service
}

Selector ...

func NewInstanceSelector

func NewInstanceSelector(id string) Selector

NewInstanceSelector ...

type Serializer

type Serializer interface {
	ContentType() string
	Marshal(data interface{}) ([]byte, error)
	Unmarshal(data []byte, v interface{}) error
	DefaultValue() interface{}
}

Serializer ...

type Service

type Service struct {
	Name     string
	Instance string
	Address  string
	Port     int
	Tags     map[string]string
}

Service ...

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker ...

func (*Worker) Close

func (w *Worker) Close()

Close ... should be called after stopped

func (*Worker) Loop

func (w *Worker) Loop()

Loop ...

func (*Worker) SetDispatcher

func (w *Worker) SetDispatcher(dis wk.Dispatcher)

SetDispatcher ...

Directories

Path Synopsis
hello
Package hello is a generated protocol buffer package.
Package hello is a generated protocol buffer package.
tls
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL