Version: v0.33.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2022 License: MIT Imports: 10 Imported by: 303



Package rpcc provides an RPC client connection with support for the JSON-RPC 2.0 specification, not including Batch requests. Server side RPC notifications are also supported.

Dial connects to an RPC server listening on a websocket using the gorilla/websocket package.

conn, err := rpcc.Dial("ws://")
// ...

The user must close the connection when finnished with it:

conn, err := rpcc.Dial("ws://")
if err != nil {
	// Handle error.
defer conn.Close()
// ...

A custom dialer can be used to change the websocket lib or communicate over other protocols.

netDial := func(ctx context.Context, addr string) (io.ReadWriteCloser, error) {
	conn, err := net.Dial("tcp", addr)
	if err != nil {
		// Handle error.
	// Wrap connection to handle writing JSON.
	// ...
	return conn, nil
conn, err := rpcc.Dial("", rpcc.WithDialer(netDial))
// ...

Communicating with the server

Send a request using Invoke:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := rpcc.Invoke(ctx, "Domain.method", args, reply, conn)
// ...

Receive a notification using NewStream:

stream, err := rpcc.NewStream(ctx, "Domain.event", conn)
if err != nil {
	// Handle error.
err = stream.RecvMsg(&reply)
if err != nil {
	// Handle error.

The stream should be closed when it is no longer used to avoid leaking memory:

stream, err := rpcc.NewStream(ctx, "Domain.event", conn)
if err != nil {
	// Handle error.
defer stream.Close()

When order is important, two streams can be synchronized with Sync:

err := rpcc.Sync(stream1, stream2)
if err != nil {
	// Handle error.



This section is empty.


View Source
var ErrConnClosing = &closeError{msg: "rpcc: the connection is closing"}

ErrConnClosing indicates that the operation is illegal because the connection is closing.

View Source
var ErrStreamClosing = &closeError{msg: "rpcc: the stream is closing"}

ErrStreamClosing indicates that the operation is illegal because the stream is closing and there are no pending messages.


func Invoke

func Invoke(ctx context.Context, method string, args, reply interface{}, conn *Conn) error

Invoke sends an RPC request and blocks until the response is received. This function is called by generated code but can be used to issue requests manually.

func Sync added in v0.12.0

func Sync(s ...Stream) (err error)

Sync takes two or more streams and sets them into synchronous operation, relative to each other. This operation cannot be undone. If an error is returned this function is no-op and the streams will continue in asynchronous operation.

All streams must belong to the same Conn and they must not be closed. Passing multiple streams of the same method to Sync is not supported and will return an error.

A stream that is closed is removed and has no further affect on the streams that were synchronized.

When two streams, A and B, are in sync they will both receive messages in the order that they arrived on Conn. If a message for both A and B arrives, in that order, it will not be possible to receive the message from B before the message from A has been received.


type Codec

type Codec interface {
	// WriteRequest encodes and writes the request onto the
	// underlying connection. Request is re-used between writes and
	// references to it should not be kept.
	WriteRequest(*Request) error
	// ReadResponse decodes a response from the underlying
	// connection. Response is re-used between reads and references
	// to it should not be kept.
	ReadResponse(*Response) error

Codec is used by recv and dispatcher to send and receive RPC communication.

type Conn

type Conn struct {
	// contains filtered or unexported fields

Conn represents an active RPC connection.

func Dial

func Dial(target string, opts ...DialOption) (*Conn, error)

Dial connects to target and returns an active connection. The target should be a WebSocket URL, "ws://" for HTTP and "wss://" for HTTPS. Example: "ws://localhost:9222/target".

func DialContext

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *Conn, err error)

DialContext is like Dial, with a caller provided Context. A nil Context will panic.

func (*Conn) Close

func (c *Conn) Close() error

Close closes the connection.

func (*Conn) Context added in v0.17.0

func (c *Conn) Context() context.Context

Context returns the underlying context for this connection.

func (*Conn) SetCompressionLevel

func (c *Conn) SetCompressionLevel(level int) error

SetCompressionLevel sets the flate compressions level for writes. Valid level range is [-2, 9]. Returns error if compression is not enabled for Conn. See package compress/flate for a description of compression levels.

type DialOption

type DialOption func(*dialOptions)

DialOption represents a dial option passed to Dial.

func WithCodec

func WithCodec(f func(conn io.ReadWriter) Codec) DialOption

WithCodec returns a DialOption that sets the codec responsible for encoding and decoding requests and responses onto the connection. This option overrides the default json codec.

func WithCompression

func WithCompression() DialOption

WithCompression returns a DialOption that enables compression for the underlying websocket connection. Use SetCompressionLevel on Conn to change the default compression level for subsequent writes.

func WithDialer

func WithDialer(f func(ctx context.Context, addr string) (io.ReadWriteCloser, error)) DialOption

WithDialer returns a DialOption that sets the dialer for the underlying connection. It can be used to replace the WebSocket library used by this package or to communicate over a different protocol.

This option overrides the default WebSocket dialer and all of: WithCompression, WithTLSConfig and WithWriteBufferSize become no-op.

func WithTLSClientConfig added in v0.21.0

func WithTLSClientConfig(c *tls.Config) DialOption

WithTLSClientConfig specifies the TLS configuration to use with tls.Client.

func WithWriteBufferSize

func WithWriteBufferSize(n int) DialOption

WithWriteBufferSize returns a DialOption that sets the size of the write buffer for the underlying websocket connection. Messages larger than this size are fragmented according to the websocket specification.

The maximum buffer size for recent versions of Chrome is 104857586 (~100MB), for older versions a maximum of 1048562 (~1MB) can be used. This is because Chrome does not support websocket fragmentation.

type Request

type Request struct {
	ID     uint64      `json:"id"`               // ID chosen by client.
	Method string      `json:"method"`           // Method invoked on remote.
	Args   interface{} `json:"params,omitempty"` // Method parameters, if any.

Request represents an RPC request to be sent to the server.

type Response

type Response struct {
	// RPC response to a Request.
	ID     uint64          `json:"id"`     // Echoes that of the Request.
	Result json.RawMessage `json:"result"` // Result from invokation, if any.
	Error  *ResponseError  `json:"error"`  // Error, if any.

	// RPC notification from remote.
	Method string          `json:"method"` // Method invokation requested by remote.
	Args   json.RawMessage `json:"params"` // Method parameters, if any.

Response represents an RPC response or notification sent by the server.

func (*Response) String

func (r *Response) String() string

type ResponseError

type ResponseError struct {
	Code    int64  `json:"code"`
	Message string `json:"message"`
	Data    string `json:"data"`

ResponseError represents the RPC response error sent by the server.

func (*ResponseError) Error

func (e *ResponseError) Error() string

type Stream

type Stream interface {
	// Ready returns a channel that is closed when a message is
	// ready to be received via RecvMsg. Ready indicates that a call
	// to RecvMsg is non-blocking.
	// Ready must not be called concurrently while relying on the
	// non-blocking behavior of RecvMsg. In this case both
	// goroutines will be competing for the same message and one
	// will block until the next message is available.
	// Calling Close on the Stream will close the Ready channel
	// indefinitely, pending messages may still be received via
	// RecvMsg.
	// Ready is provided for use in select statements.
	Ready() <-chan struct{}
	// RecvMsg unmarshals pending messages onto m. Blocks until the
	// next message is received, context is canceled or stream is
	// closed.
	// When m is a *[]byte the message will not be decoded and the
	// raw bytes are copied into m.
	RecvMsg(m interface{}) error
	// Close closes the stream and no new messages will be received.
	// RecvMsg will return ErrStreamClosing once all pending messages
	// have been received.
	Close() error

Stream represents a stream of notifications for a certain method.

func NewStream

func NewStream(ctx context.Context, method string, conn *Conn) (Stream, error)

NewStream creates a new stream that listens to notifications from the RPC server. This function is called by generated code.



Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL