vrpc

package module
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

README

vrpc

GoDoc License

RPC framework for Go running over standard HTTP.

  • No code generation.
  • Built on top of standard net/http without manual connection management.
  • Compatible with standard proxies, balancers and service mesh environments.
  • Contracts are defined using standard Go structs and methods.
  • Service name can be derived from the generic type argument or from implementation type.
  • Supports synchronous calls, asynchronous notifications and streaming from any side.
  • Pluggable codecs (selected by Content-Type).
    JSON, gob and msgpack are supported out of the box.
  • Client uses msgpack by default.
  • Server falls back to JSON when Content-Type is missing.
Why?

To combine the simplicity of net/rpc with the infrastructure compatibility of standard HTTP.

Comparison
Feature vrpc gRPC net/rpc REST
Transport HTTP/1.1+ HTTP/2 Custom TCP/HTTP HTTP/1.1+
Contracts Go Structs Protobuf Go Structs OpenAPI / Swagger
Code Gen None Required None Optional
Semantics RPC (Actions) RPC (Actions) RPC (Actions) Resources (CRUD)
Context context.Context context.Context Limited context.Context
Performance Moderate High High Varies
Compatibility High Moderate Low High
Streaming One-way Bidi No Varies
Usage

Services must implement methods with the following signature:

// unary
func (s *Service) Method(ctx context.Context, req *Request) (*Response, error)

// server streaming
func (s *Service) Method(ctx context.Context, req *Request, w io.Writer) (*Response, error)

// client streaming
func (s *Service) Method(ctx context.Context, req *Request, r io.Reader) (*Response, error)

The request parameter may be either a value or a pointer.

Bidirectional streaming is hard to get right on top of HTTP/1 so it's not supported.

Server

type HelloRequest struct{ Name string }
type HelloResponse struct{ Greeting string }

type GreeterService struct{}

func (s *GreeterService) SayHello(ctx context.Context, req *HelloRequest) (*HelloResponse, error) {
    return &HelloResponse{Greeting: "Hello, " + req.Name}, nil
}

func main() {
	
    s := new(GreeterService)
    
    handler, err := vrpc.NewHandler("GreeterService", s)
    // or
    handler, err := vrpc.NewHandlerOf(s)
    // or
    handler, err := vrpc.NewHandlerFor[GreeterService](s)
    
    
    http.ListenAndServe(":8080", handler)
    // or
    mux, err := vrpc.NewMux(handler1, handler2, handler3)
    http.ListenAndServe(":8080", mux)
}
Client
func main() {

    c, err := vrpc.NewClient(vrpc.WithEndpoint("http://localhost:8080"))

    req := &HelloRequest{Name: "World"}

    // standard call

    resp := new(HelloResponse)
    err := c.Call(ctx, "GreeterService", "SayHello", req, resp)
	
    // or with a generic helper
    resp, err := vrpc.Call[HelloResponse](c, ctx, "GreeterService", "SayHello", req)
	
    // or with another generic helper
    resp, err := vrpc.CallFor[GreeterService, HelloResponse](c, ctx, "SayHello", req)

    // notify - does not wait for a server to process the request
    err = c.Notify(ctx, "GreeterService", "SayHello", req)

    // beacon - sends in the background, ignoring all but encoding errors
    err = c.Beacon(ctx, "GreeterService", "SayHello", req)
}
Server Streaming
func (s *Service) Method(context.Context, *Request, io.Writer) (*Response, error)

The data written to io.Writer is streamed to the client incrementally. After the method returns, the returned *Response is sent as the final message or, if an error is returned, the error is sent to the client.

io.Writer provided to the method also implements

interface { Flush() error }

Calling Flush forces buffered data to be sent immediately. Otherwise, data may be buffered to improve throughput. With frequent Flush, latency is minimized at the cost of throughput. The choice is left to the service implementation.

On the client side, server streaming is consumed using:

err := client.ServerStream(ctx, "ServiceName", "Method", req, dst, resp)

dst receives streamed binary data, resp receives the final response.

Client Streaming
func (s *Service) Method(context.Context, *Request, io.Reader) (*Response, error)

Client uploads a stream of binary data, which is exposed to the service as io.Reader. The initial request object is decoded before streaming begins. The service may read from io.Reader incrementally. When the client finishes uploading, the service returns a response or an error.

On the client side, client streaming is performed using:

err := client.ClientStream(ctx, "ServiceName", "Method", req, src, resp)

src is read until EOF, resp receives the final response.

If the server returns before fully reading the stream, the client upload is aborted.

Context cancellation should be handled and respected by both sides.

Service Mesh Environments
// requests will be made to the provided endpoint
// with the Host header set to a service name (e.g. GreeterService)
c, err := vrpc.NewClient(
    vrpc.WithEndpoint("http://localhost:8080"), 
    vrpc.WithMode(vrpc.ServiceToHeader))


// requests will be made to the URL "<scheme>://<ServiceName>/<ServiceName>/<Method>"
// with the Host header set to a service name (e.g. GreeterService)
c, err := vrpc.NewClient(
    vrpc.WithMode(vrpc.ServiceToURL))
Client Options
func WithEndpoint(endpoint string) ClientOption
func WithClient(c *http.Client) ClientOption
func WithCodec(codec Codec) ClientOption
func WithMode(mode Mode) ClientOption
func WithScheme(scheme string) ClientOption
func WithPrefix(prefix string) ClientOption
Codecs

type Codec interface {
    Encode(w io.Writer, v any) error
    Decode(r io.Reader, v any) error
    ContentType() string
}

vrpc.RegisterCodec(c)

JSON, gob and msgpack are already included.

Interface Types
type Math struct {
    // ...
}

type MathService interface {
    Sum(context.Context, *SumRequest) (*SumResponse, error)
}

func main() {
    impl := new(Math)

    // service name is "CustomName"
    h, err := NewHandler("CustomName", impl)

    // service name is "Math"
    h, err := NewHandlerOf(impl)

    // service name is "MathService",
    // the handler exposes all suitable methods of the concrete implementation
    h, err := NewHandlerFor[MathService](impl)

    // service name is "MathService",
    // the handler only exposes suitable methods of the MathService interface
    h, err := NewStrictHandlerFor[MathService](impl)
    
}
Benchmarks

vrpc vs net/rpc

cpu: AMD Ryzen 9 5900HX with Radeon Graphics
BenchmarkVRPC_Call_HTTP-16          13016     90857 ns/op     15604 B/op    86 allocs/op
BenchmarkNetRPC_Call_HTTP-16        25706     46758 ns/op       496 B/op    15 allocs/op
BenchmarkVRPC_Call_InProc-16       205393      5643 ns/op     10944 B/op    31 allocs/op
BenchmarkNetRPC_Call_InProc-16     156754      7677 ns/op       514 B/op    16 allocs/op

It's almost twice as slow as net/rpc. Most allocations come from net/http.

Running the same benchmarks with a fasthttp backend (not published here) yields better results:

cpu: AMD Ryzen 9 5900HX with Radeon Graphics
BenchmarkVRPC_Call_HTTP-16          24279     42743 ns/op       334 B/op    13 allocs/op
BenchmarkNetRPC_Call_HTTP-16        25471     48041 ns/op       496 B/op    15 allocs/op
BenchmarkVRPC_Call_Direct-16       391231      2854 ns/op       489 B/op    22 allocs/op
BenchmarkNetRPC_Call_Direct-16     152834      7723 ns/op       513 B/op    16 allocs/op

However, this comes at the cost of breaking compatibility with the standard library.
After weighing the trade-offs, I decided to stick with net/http.

Internals
  • URL:
    • POST <endpoint>/<ServiceName>/<MethodName>
    • POST <scheme>://<ServiceName>/<ServiceName>/<MethodName> with ServiceToURL
  • Headers:
    • Content-Type - from the codec (application/json, application/gob, etc.)
    • X-Vrpc-Err - error message when the call failed
    • X-Vrpc - call mode
  • Streaming:
    • Streaming is implemented on top of HTTP using a framed protocol.
    • Frame types:
      • 1 - message (request or response value)
      • 2 - binary (streamed data)
      • 3 - ping (keepalive)
      • 4 - error (end of stream)
      • 0 - final (end of stream)
    • The framing format is internal and not part of the user API, but allows custom implementations in other languages. All frames are encoded using the same (requested or default) codec as the request. Frame marker is a simple byte also encoded by the codec. Message (1) and binary (2) markers are followed by the encoded data.
    • Keepalive (pings):
      • Ping frames are sent only when there is no recent activity.
      • Ping frames prevent idle connections from being closed by proxies.
      • Ping traffic is suppressed when the stream is actively used.
      • Receivers always ignore ping frames.
      • Sending policy is implementation-defined.
    • Streaming writers are not safe for concurrent use.
    • Streaming output is buffered:
      • Writes without Flush prioritize throughput.
      • Explicit Flush prioritizes latency.
      • HTTP transport may also flush automatically based on buffer size.
    • Streaming using JSON as a codec is inefficient.

Documentation

Index

Constants

View Source
const (
	// ErrorHeader is the HTTP header key used to report error messages from the server.
	ErrorHeader = "X-Vrpc-Err"
	// ProtoHeader is the HTTP header key used to specify the RPC call mode.
	ProtoHeader = "X-Vrpc"

	// StatusEncodingError is returned by server when it failed to encode the response.
	StatusEncodingError = 567
)

Variables

View Source
var (
	ErrNotFound = &Error{"not found"}
	ErrNoCodec  = &Error{"codec not supported"}
)

Functions

func Call

func Call[T any](c *Client, ctx context.Context, service, method string, req any) (*T, error)

Call is a generic helper function that invokes a method on the Client and returns result as *T.

func CallFor

func CallFor[S any, T any](c *Client, ctx context.Context, method string, req any) (*T, error)

CallFor is a generic helper function that invokes a method on the Client using S type as a service name and returns result as *T.

func ClientStream added in v0.2.2

func ClientStream[T any](c *Client, ctx context.Context, service, method string, req any, src io.Reader) (*T, error)

ClientStream is a generic helper function that invokes a client streaming method on the Client and returns result as *T.

func ClientStreamFor added in v0.2.2

func ClientStreamFor[S any, T any](c *Client, ctx context.Context, method string, req any, src io.Reader) (*T, error)

ClientStreamFor is a generic helper function that invokes a client streaming method on the Client using S type as a service name and returns result as *T.

func RegisterCodec

func RegisterCodec(codec Codec)

RegisterCodec adds a codec to the global list of server codecs. It is not safe for concurrent use and must be called before any Handler starts serving requests.

func ServerStream added in v0.2.2

func ServerStream[T any](c *Client, ctx context.Context, service, method string, req any, dst io.Writer) (*T, error)

ServerStream is a generic helper function that invokes a server streaming method on the Client and returns result as *T.

func ServerStreamFor added in v0.2.2

func ServerStreamFor[S any, T any](c *Client, ctx context.Context, method string, req any, dst io.Writer) (*T, error)

ServerStreamFor is a generic helper function that invokes a server streaming method on the Client using S type as a service name and returns result as *T.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an RPC client.

func NewClient

func NewClient(options ...ClientOption) (*Client, error)

NewClient constructs a Client using the provided options. It validates the resulting configuration and returns an error if it is not valid.

func (*Client) Beacon

func (c *Client) Beacon(ctx context.Context, service, method string, request any) error

Beacon sends the request, but does not check the result or whether the request reached the server. Errors are returned only if encoding fails.

func (*Client) Call

func (c *Client) Call(ctx context.Context, service, method string, request any, response any) error

Call executes a synchronous RPC method. It sends the request, waits for the server to process it, and decodes the body into the response.

func (*Client) ClientStream added in v0.2.0

func (c *Client) ClientStream(ctx context.Context, service, method string, request any, src io.Reader, response any) error

ClientStream calls an RPC method that streams data from the client to the server.

Server implementation receives an io.Reader and may read the stream until EOF, then returns a regular unary response which is decoded into response.

End-of-stream is signaled by src returning io.EOF. If you want to stream data incrementally (e.g. transfer objects), use an io.Pipe (or a custom io.Reader) and close the writer side to signal EOF.

Context cancellation aborts the request. The upload goroutine will eventually stop when the request is torn down; src may observe read errors.

The upload path is buffered; explicit flushing is not exposed.

Transport and protocol errors are returned as *vrpc.Error. Service-level failures are returned as a regular error (not wrapped). On upload errors, the returned error reflects the upload failure even if the server already replied (best-effort error propagation).

func (*Client) Notify

func (c *Client) Notify(ctx context.Context, service, method string, request any) error

Notify sends the request, but does not wait for the server to process it. Errors are returned if encoding failed or if the server was unable to decode the request.

func (*Client) ServerStream added in v0.2.0

func (c *Client) ServerStream(ctx context.Context, service, method string, request any, dst io.Writer, response any) error

ServerStream calls an RPC method that streams data from the server to the client.

The server implementation receives an io.Writer and may write the streamed payload to it. On the client side, all streamed bytes are forwarded into dst.

Context cancellation aborts the request. dst may observe a write error originating from request cancellation.

dst receives bytes as they arrive, but actual network delivery depends on server flushing. The server-side writer supports optional explicit Flush to prioritize latency; otherwise buffering may increase throughput.

Transport and protocol errors are returned as *vrpc.Error. Service-level failures are returned as a regular error (not wrapped).

If an error occurs, dst may have already received a prefix of the stream.

type ClientOption

type ClientOption func(*clientConfig)

ClientOption configures a Client in NewClient.

func WithClient

func WithClient(hc *http.Client) ClientOption

WithClient sets the underlying HTTP client (transport/TLS/etc.). If not set, a default client is created with MaxIdleConnsPerHost set to 100.

func WithCodec

func WithCodec(codec Codec) ClientOption

WithCodec sets the RPC codec. If not set, msgpack is used.

func WithEndpoint

func WithEndpoint(endpoint string) ClientOption

WithEndpoint sets the fixed endpoint dial target (scheme://host[:port][/prefix]). Required for default mode and for ServiceToHeader; forbidden for ServiceToURL.

func WithMode

func WithMode(mode Mode) ClientOption

WithMode sets routing mode. If not set, ModeDefault is used.

func WithPrefix

func WithPrefix(prefix string) ClientOption

WithPrefix sets a path prefix (e.g. "/prefix"). If Endpoint has a path component and WithPrefix is set, WithPrefix overrides the endpoint path.

func WithScheme

func WithScheme(scheme string) ClientOption

WithScheme sets scheme used only in ServiceToURL mode (default: http).

type Codec

type Codec interface {
	Encode(w io.Writer, v any) error
	Decode(r io.Reader, v any) error
	NewEncoder(w io.Writer) Encoder
	NewDecoder(r io.Reader) Decoder
	ContentType() string
}

Codec defines the interface for encoding and decoding RPC messages.

type Decoder added in v0.2.0

type Decoder interface{ Decode(any) error }

type Encoder added in v0.2.0

type Encoder interface{ Encode(any) error }

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error type is returned for RPC errors (transport, encoding, decoding). Errors returned by the service implementation are not wrapped by this type.

func (*Error) Error

func (e *Error) Error() string

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler serves RPC requests for a specific service implementation.

func Def

func Def[T any](impl any) *Handler

Def is an alias for NewHandlerFor that panics if an error occurs.

func DefService

func DefService(service string, impl any) *Handler

DefService is an alias for NewHandler that panics if an error occurs.

func NewHandler

func NewHandler(service string, impl any) (*Handler, error)

NewHandler creates a new Handler for the given implementation, using a user-provided service name. It reflects over impl to find suitable methods. A suitable method must have one of the following signatures:

Method(context.Context, *Request) (*Response, error). // unary
Method(context.Context, *Request, io.Writer) (*Response, error)  // download/stream
Method(context.Context, *Request, io.Reader) (*Response, error)  // upload/stream

func NewHandlerFor

func NewHandlerFor[T any](impl any) (*Handler, error)

NewHandlerFor creates a new Handler for the given implementation, using type T to determine the service name.

If T is an interface, impl must implement it. However, even when T is an interface, all suitable methods found on the concrete implementation type are exposed by the handler, not only the methods declared in the interface T.

func NewHandlerOf

func NewHandlerOf(impl any) (*Handler, error)

NewHandlerOf creates a new Handler for the given implementation, inferring the service name from the implementation type.

func NewStrictHandlerFor

func NewStrictHandlerFor[T any](impl any) (*Handler, error)

NewStrictHandlerFor creates a new Handler for the given implementation, using type T as a service name and a contract that restricts which methods are exposed.

Only methods that are present on T and satisfy the required signature are exposed.

The implementation must fully implement RPC method subset of T: for every suitable method on the contract type, impl must have a method with the same name and a compatible signature. Otherwise, an error is returned.

If T is an interface, impl must also implement T.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Mode

type Mode uint8

Mode controls how the client routes requests.

const (
	// ModeDefault means: connect to Endpoint, send Host header equal to Endpoint host.
	ModeDefault Mode = iota

	// ServiceToHeader means: connect to Endpoint, but send Host header equal to service name.
	// Useful for sidecars/proxies that route by Host/authority.
	ServiceToHeader

	// ServiceToURL means: connect directly to http(s)://<service>/..., and also send Host header = service.
	// Endpoint must not be set in this mode.
	ServiceToURL
)

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

Mux is a multiplexer that routes requests to specific Handlers based on the service name.

func NewMux

func NewMux(handlers ...*Handler) (*Mux, error)

NewMux creates a new Mux with the provided list of Handlers. It returns an error if duplicate service names are detected.

func (*Mux) Add

func (m *Mux) Add(handler *Handler) error

Add registers a new Handler with the Mux. It returns an error if a service with the same name already exists. It is not safe to call Add concurrently after the Mux has started serving requests.

func (*Mux) ServeHTTP

func (m *Mux) ServeHTTP(w http.ResponseWriter, r *http.Request)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL