reverserpc

package module
v0.0.0-...-4208786 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2024 License: MIT Imports: 14 Imported by: 0

README

Go reverse RPC

A remote procedure call (RPC) framework designed for connecting to devices remotely. It enables the "server" to call functions provided by the "client".

Build Status Go Report Card GoDoc

WARNING

This project is currently under development, and the API may undergo breaking changes. Please use it at your own risk.

Features

  • Supports multiple communication protocols - currently implemented MQTT 3.1/3.11
  • Allows encoding data in different formats - currently supports JSON and Protobuf
  • Provides monitoring metrics for system insights
  • Implements error handling mechanisms for reliability
TODO
  • Open telemetry support
  • MQTT v5 protocol support
  • WebSocket protocol support
  • AMQP protocol support

Installation

go get github.com/xizhibei/go-reverse-rpc@latest

Usage

Server create
import (
    "github.com/xizhibei/go-reverse-rpc/mqttpb"
    "github.com/xizhibei/go-reverse-rpc/mqttadapter"
)

mqttClient, err := mqttadapter.New("tcp://localhost", "123456-server")
if err != nil {
    panic(err)
}

server := mqttpb.NewServer(
    mqttClient,
    path.Join("example", "123456", "request/+"),
)
Client create
import (
    "github.com/xizhibei/go-reverse-rpc/mqttpb"
    "github.com/xizhibei/go-reverse-rpc/mqttadapter"
)

mqttClient, err := mqttadapter.New("tcp://localhost", "123456-client")
if err != nil {
    panic(err)
}

client := mqttpb.New(
    mqttClient,
    "example",
    mqttpb.ContentEncoding_GZIP,
)
Register handler on server side
import (
    rrpc "github.com/xizhibei/go-reverse-rpc"
)

server.Register(method, &rrpc.Handler{
    Method: func(c rrpc.Context) {
        var req Req
        err := c.Bind(&req)
        if err != nil {
            c.ReplyError(rrpc.RPCStatusClientError, err)
            return
        }

        // your business logic ...

        c.ReplyOK(req)
    },
    Timeout: 5 * time.Second,
})
Call on client side
var res Req
err := client.Call(context.Background(), suite.deviceId, method, &reqParams, &res)

License

Go reverse RPC released under MIT license, refer LICENSE file.

Documentation

Index

Constants

View Source
const (
	RPCStatusOK             = 200
	RPCStatusClientError    = 400
	RPCStatusRequestTimeout = 408
	RPCStatusTooFraquently  = 429
	RPCStatusServerError    = 500

	DefaultQoS = 0
)

Variables

View Source
var (
	// ErrNoReply is an error indicating an empty reply.
	ErrNoReply = errors.New("[RRPC] empty reply")

	// ErrTooFraquently is an error indicating that the request was made too frequently.
	ErrTooFraquently = errors.New("[RRPC] too frequently, try again later")

	// ErrTimeout is an error indicating a timeout occurred.
	ErrTimeout = errors.New("[RRPC] timeout")
)

Functions

This section is empty.

Types

type AfterResponseEvent

type AfterResponseEvent struct {
	Labels   prometheus.Labels
	Duration time.Duration
	Res      *Response
}

AfterResponseEvent represents an event that is emitted after a response is sent.

type ChildContext

type ChildContext interface {
	// ID returns the unique identifier of the context.
	ID() *ID

	// Method returns the name of the RPC method.
	Method() string

	// Reply sends a response message.
	// It returns true if the response was sent successfully, false otherwise.
	Reply(res *Response) bool

	// ReplyDesc returns the description of the reply message.
	ReplyDesc() string

	// Bind binds the request data to the context.
	Bind(request interface{}) error

	// PrometheusLabels returns the Prometheus labels associated with the context.
	PrometheusLabels() prometheus.Labels
}

ChildContext represents the base interface for reverse RPC contexts.

type Context

type Context interface {
	ChildContext

	// Ctx returns the underlying context.Context.
	Ctx() context.Context

	// GetResponse returns the response message.
	GetResponse() *Response

	// ReplyOK sends a successful response message with the given data.
	// It returns true if the response was sent successfully, false otherwise.
	ReplyOK(data interface{}) bool

	// ReplyError sends an error response message with the given status and error.
	// It returns true if the response was sent successfully, false otherwise.
	ReplyError(status int, err error) bool
}

Context represents the context for reverse RPC.

type Handler

type Handler struct {
	Method  func(c Context)
	Timeout time.Duration
}

Handler represents a reverse RPC handler. Method is the function to be executed when handling the request. Timeout is the maximum duration allowed for the request to complete.

type ID

type ID struct {
	Num uint64 // Num is the numeric value of the identifier.
	Str string // Str is the string value of the identifier.
}

ID represents an identifier with a numeric value and a string value.

func (*ID) String

func (id *ID) String() string

String returns the string representation of the ID. If the ID has a non-empty string representation, it returns the string representation. Otherwise, it returns the numeric representation of the ID as a decimal string.

type OnAfterResponseCallback

type OnAfterResponseCallback func(e *AfterResponseEvent)

OnAfterResponseCallback is a function type that represents a callback function to be executed after a response is sent. It takes a pointer to an AfterResponseEvent as its parameter.

type RequestContext

type RequestContext struct {
	// contains filtered or unexported fields
}

RequestContext is the context implement for reverse RPC.

func NewRequestContext

func NewRequestContext(ctx context.Context, childCtx ChildContext) *RequestContext

NewRequestContext creates a new instance of RequestContext with the given context and ContextInstance. It returns a pointer to the newly created RequestContext.

func (*RequestContext) Bind

func (c *RequestContext) Bind(request interface{}) error

Bind binds the given request object to the context. It uses the underlying childCtx to perform the binding. Returns an error if the binding fails.

func (*RequestContext) Ctx

func (c *RequestContext) Ctx() context.Context

Ctx returns the context associated with the RequestContext. If no context is set, it returns the background context.

func (*RequestContext) GetResponse

func (c *RequestContext) GetResponse() *Response

GetResponse returns the response associated with the context. It acquires a lock on the response mutex to ensure thread safety. Returns the response object.

func (*RequestContext) ID

func (c *RequestContext) ID() *ID

ID returns the ID associated with the context.

func (*RequestContext) Method

func (c *RequestContext) Method() string

Method returns the name of the RPC method being called. It retrieves the method name from the underlying context instance.

func (*RequestContext) PrometheusLabels

func (c *RequestContext) PrometheusLabels() prometheus.Labels

PrometheusLabels returns the Prometheus labels associated with the context. It retrieves the Prometheus labels from the underlying context instance.

func (*RequestContext) Reply

func (c *RequestContext) Reply(res *Response) bool

Reply sends a response to the client. It sets the response and calls the BaseReply method to handle the response. If the reply has already been sent, it returns false. Otherwise, it sets the reply status to true and returns true.

func (*RequestContext) ReplyDesc

func (c *RequestContext) ReplyDesc() string

ReplyDesc returns the description of the reply message for the current context.

func (*RequestContext) ReplyError

func (c *RequestContext) ReplyError(status int, err error) bool

ReplyError sends an error response with the specified status code and error message. It returns true if the response was successfully sent, otherwise false.

func (*RequestContext) ReplyOK

func (c *RequestContext) ReplyOK(data interface{}) bool

ReplyOK sends a successful response with the given data. It returns true if the response was sent successfully, otherwise false.

type Response

type Response struct {
	Result interface{}
	Error  error
	Status int
}

Response represents a response message. Result holds the response data. Error holds any error that occurred during the request. Status holds the status code of the response.

type ReverseRPC

type ReverseRPC interface {
	Close() error
	IsConnected() bool
	Register(method string, hdl *Handler)
	RegisterMetrics(responseTime *prometheus.HistogramVec, errorCount *prometheus.GaugeVec)
}

ReverseRPC is an interface that represents a reverse RPC client.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents a reverse RPC server.

func NewServer

func NewServer(options ...ServerOption) *Server

NewServer creates a new instance of the Server struct with the provided options. It initializes the server with default values for the options that are not provided. The server instance is returned as a pointer.

func (*Server) Call

func (s *Server) Call(c Context)

Call handles the RPC call by executing the specified method and processing the response. It measures the duration of the call, logs the response if enabled, and emits an event after the response. If the call exceeds the timeout or encounters an error, it replies with an appropriate error message.

func (*Server) OnAfterResponse

func (s *Server) OnAfterResponse(cb OnAfterResponseCallback)

OnAfterResponse registers a callback function to be executed after each response is sent. The provided callback function will be added to the callback list of the server. The callback function will be called with the response as its argument.

func (*Server) Register

func (s *Server) Register(method string, hdl *Handler)

Register registers a method with its corresponding handler in the server. If the method is already registered, it will be overridden. The method parameter specifies the name of the method. The hdl parameter is a pointer to the handler that will be associated with the method.

func (*Server) RegisterMetrics

func (s *Server) RegisterMetrics(responseTime *prometheus.HistogramVec, errorCount *prometheus.GaugeVec)

RegisterMetrics registers metrics for monitoring the server's response time and error count. It takes two parameters: responseTime, a Prometheus HistogramVec for tracking response time, and errorCount, a Prometheus GaugeVec for counting errors. The function adds an event listener to the server's OnAfterResponse event, which is triggered after each response is sent. Inside the event listener, it extracts the response status and labels from the event, and updates the labels with additional information. It then uses the responseTime HistogramVec to record the response time, and the errorCount GaugeVec to increment the error count if there is an error in the response.

type ServerOption

type ServerOption func(o *serverOptions)

ServerOption is a functional option for configuring the server.

func WithLimiter

func WithLimiter(d time.Duration, count int) ServerOption

WithLimiter is a function that returns a ServerOption which sets the limiter duration and count for the server. The limiter duration specifies the time window in which the server can handle a certain number of requests. The limiter count specifies the maximum number of requests that can be handled within the specified time window. Default values are 1 second and 5 requests.

func WithLimiterReject

func WithLimiterReject() ServerOption

WithLimiterReject returns a ServerOption that sets the limiterReject field of the serverOptions struct to true. This is default behavior. If you want the server to wait for available resources instead of rejecting requests when the limiter is full, use WithLimiterWait.

func WithLimiterWait

func WithLimiterWait() ServerOption

WithLimiterWait returns a ServerOption that sets the limiterReject field of the serverOptions struct to false. This allows the server to wait for available resources instead of rejecting requests when the limiter is full. If you want the server to reject requests when the limiter is full, use WithLimiterReject.

func WithLogResponse

func WithLogResponse(logResponse bool) ServerOption

WithLogResponse is a function that returns a ServerOption to enable or disable logging of response. It takes a boolean parameter logResponse, which determines whether to log the response or not. The returned ServerOption modifies the serverOptions struct by setting the logResponse field.

func WithServerName

func WithServerName(name string) ServerOption

WithServerName is a function that returns a ServerOption to set the name of the server. The name parameter specifies the name of the server. It returns a function that takes a pointer to serverOptions and sets the name field.

func WithWorkerNum

func WithWorkerNum(count int) ServerOption

WithWorkerNum is a function that returns a ServerOption which sets the number of workers for the server. The count parameter specifies the number of workers to be set.

Directories

Path Synopsis
Package mock_reverserpc is a generated GoMock package.
Package mock_reverserpc is a generated GoMock package.
Package mqtt provides options for configuring MQTT client.
Package mqtt provides options for configuring MQTT client.
mock
Package mock_mqttadapter is a generated GoMock package.
Package mock_mqttadapter is a generated GoMock package.
mock/mqtt
Package mock_mqtt is a generated GoMock package.
Package mock_mqtt is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL