rpc

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2024 License: MIT Imports: 11 Imported by: 1

README

RPC Redis

RPC Redis codecov

rpc-redis is a Go package that implements a JSON-RPC-like protocol over Redis Streams and channels. This package allows you to build scalable and efficient RPC servers and clients using Redis as the underlying transport mechanism.

Features

  • JSON-RPC-like Protocol: Implements a protocol similar to JSON-RPC for seamless integration.
  • Redis Streams and Channels: Utilizes Redis Streams and channels for message passing, ensuring high performance and reliability.
  • Easy to Use: Simple API for setting up RPC servers and clients.
  • Flexible Handlers: Easily add custom handlers for different RPC methods.

Installation

To install the package, run:

go get github.com/yourusername/rpc-redis

RPC Server

Below is an example of how to set up an RPC server:

redisClient := redis.NewClient(&redis.Options{
    Addr: "localhost:6379",
})

rpcServer := rpc.NewServer(redisClient, "echo.EchoService", "echo-group", "echo-consumer")

rpcServer.AddHandler("Echo", func(req rpc.Request) (any, error) {
    var echoReq EchoRequest
    if err := req.ParseParams(&echoReq); err != nil {
        return nil, fmt.Errorf("error parsing request: %v", err)
    }

    var stash Stash
    if err = rpc.ParseStash(req.Context(), &stash); err != nil {
        return nil, fmt.Errorf("error parsing stash: %v", err)
    }

    slog.Info("Received request: " + echoReq.Value)

    return &echoReq, nil
})

slog.Info("Starting RPC server")
if err := rpcServer.Run(); err != nil {
    slog.Error("Error running RPC server: " + err.Error())
}

slog.Info("Server stopped")

RPC Client

Below is an example of how to set up an RPC client:

redisClient := redis.NewClient(&redis.Options{
    Addr: "localhost:6379",
})
defer redisClient.Close()

rpcClient := rpc.NewClient(redisClient, "echo.EchoService")
defer rpcClient.Close()

ctx := context.Background()
ctx, err := rpc.SetStash(ctx, &Stash{Value: "Hello, stash!"})
if err != nil {
	slog.Error("Error setting stash: " + err.Error())
	return
}

resp, err := rpcClient.Call(ctx, "Echo", &EchoRequest{Value: "Hello, world!"})

if err != nil {
    slog.Error("Error calling RPC: " + err.Error())
    return
}

var result EchoRequest
err = resp.ParseResut(&result)
if err != nil {
    slog.Error("Error parsing result: " + err.Error())
    return
}

fmt.Println(result)

Contributing

Contributions are welcome! Please feel free to submit a pull request or open an issue if you encounter any problems or have suggestions for improvements.

License

This project is licensed under the MIT License.

Documentation

Index

Constants

View Source
const (
	DefaultBlockInterval = 10 * time.Second
	DefaultConcurency    = 25
)

Variables

View Source
var ErrClientClosed = errors.New("client closed")
View Source
var ErrNilContext = errors.New("context is nil")
View Source
var ErrNoStash = errors.New("stash not found in context")
View Source
var ErrUnexpectedStashType = errors.New("unexpected stash type")

Functions

func ParseStash added in v0.3.0

func ParseStash(ctx context.Context, v any) error

ParseStash parses the stash value from the context and unmarshals it into the provided value. It returns an error if the stash value is not found in the context or if it has an unexpected type.

func SetStash added in v0.3.0

func SetStash(ctx context.Context, stash any) (context.Context, error)

SetStash sets the stash value in the context. It marshals the stash value to JSON and stores it in the context using a specific key. If the context is nil, it returns an error. If there is an error while marshalling the stash value, it returns an error with the specific error message. Otherwise, it returns the updated context with the stash value set.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(redisClient *redis.Client, channel string, opts ...ClientOption) *Client

NewClient creates a new instance of the Client struct. It takes a Redis client and a channel name as parameters. It returns a pointer to the newly created Client.

func (*Client) Call

func (c *Client) Call(ctx context.Context, method string, params any) (*Response, error)

Call sends a request to the server using the specified method and parameters. It returns the response received from the server or an error if the request fails. The context `ctx` is used for cancellation and timeout. The `method` parameter specifies the method to be called on the server. The `params` parameter contains the parameters to be passed to the server method. The returned response is of type `*Response` and contains the result of the server method call. If an error occurs during the request or if the server returns an error response, an error is returned with a descriptive error message.

func (*Client) Close

func (c *Client) Close()

Close cancels any pending requests and closes the client connection.

type ClientOption added in v0.3.0

type ClientOption func(*Client)

ClientOption is a function type that can be used to configure a Client. It takes a pointer to a Client and modifies its properties.

func WithInterceptors added in v0.3.0

func WithInterceptors(interceptors ...Interceptor) ClientOption

WithInterceptors adds the provided interceptors to the client.

type Handler

type Handler func(req *Request) (any, error)

type Interceptor added in v0.3.0

type Interceptor func(next RequestHandler) RequestHandler

type Request

type Request struct {
	Method  string
	ID      string
	ReplyTo string
	// contains filtered or unexported fields
}

func NewRequest

func NewRequest(ctx context.Context, method, id, params, replyTo string) *Request

NewRequest creates a new Request with the provided parameters. It returns a Request interface.

func (*Request) Context

func (r *Request) Context() context.Context

Context returns the context of the request.

func (*Request) ParseParams

func (r *Request) ParseParams(v any) error

ParseParams parses the JSON-encoded parameters of the request into the provided value. The value must be a pointer to the desired type.

func (*Request) WithContext added in v0.3.0

func (r *Request) WithContext(ctx context.Context) *Request

WithContext returns a copy of r with its context changed to ctx. The provided ctx must be non-nil.

type RequestHandler added in v0.3.0

type RequestHandler func(req *Request) (*Response, error)

type Response

type Response struct {
	ID     string          `json:"id"`
	Error  string          `json:"error,omitempty"`
	Result json.RawMessage `json:"result,omitempty"`
}

Response represents the response structure for a Redis RPC call.

func NewResponse added in v0.3.0

func NewResponse(id string, result any, err error) (*Response, error)

newResponse creates a new instance of the Response struct.

func (*Response) ParseResut

func (r *Response) ParseResut(v interface{}) error

ParseResut parses the result of the response into the provided value. The result is expected to be in JSON format and will be unmarshaled into the provided value.

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(redisClient *redis.Client, stream, group, consumer string, opts ...ServerOption) *Server

NewServer creates a new instance of the Server struct. It takes a Redis client, stream name, consumer group name, and consumer name as parameters. It returns a pointer to the newly created Server instance.

func (*Server) AddHandler

func (s *Server) AddHandler(rpcName string, handler Handler)

AddHandler adds a new RPC handler to the server. It associates the given `handler` with the specified `rpcName`. If a handler already exists for the same `rpcName`, it panics.

func (*Server) Close

func (s *Server) Close()

Close stops the server gracefully by cancelling the context and waiting for all goroutines to finish.

func (*Server) Run

func (s *Server) Run() error

Run starts the server and continuously reads messages from the Redis stream. It initializes the reader, sets up the read arguments, and enters an infinite loop to read messages from the stream. It processes each message by calling the `processMessage` method.

If an error occurs during initialization or reading the stream, it returns the error. If the stream is empty, it continues to the next iteration.

The `Run` method is responsible for running the server and handling the continuous message processing from the Redis stream.

type ServerOption added in v0.3.0

type ServerOption func(*Server)

ServerOption is a function type that can be used to configure a Server. It takes a pointer to a Server and modifies its properties.

func WithServerInterceptors added in v0.3.0

func WithServerInterceptors(interceptors ...Interceptor) ServerOption

WithMiddleware is a function that returns a ServerOption which sets the middleware for the server. Middleware is a list of Interceptor functions that will be applied to incoming requests.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL