grpcws

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: Apache-2.0 Imports: 12 Imported by: 1

README

grpcws - gRPC-over-WebSocket

The grpcws package provides WebSocket transport for gRPC streaming RPCs with full lifecycle management, heartbeat detection, and graceful error handling.

Overview

This package bridges the gap between gRPC streaming and browser-based WebSocket clients. It wraps gRPC client streams and exposes them over WebSocket with:

  • All three streaming modes: Server, Client, and Bidirectional streaming
  • JSON envelope protocol: Human-readable message format with protojson payloads
  • Full lifecycle management: Connection tracking, heartbeat ping-pong, graceful shutdown
  • Metrics tracking: Message counts, connection duration, and more

Installation

import "github.com/panyam/servicekit/grpcws"

Quick Start

Server Streaming

For RPCs like rpc Subscribe(Request) returns (stream Response):

router.HandleFunc("/ws/v1/subscribe", gohttp.WSServe(
    grpcws.NewServerStreamHandler(
        // Create the gRPC stream from a request
        func(ctx context.Context, req *pb.SubscribeRequest) (pb.GameService_SubscribeClient, error) {
            return grpcClient.Subscribe(ctx, req)
        },
        // Parse the initial request from HTTP
        func(r *http.Request) (*pb.SubscribeRequest, error) {
            return &pb.SubscribeRequest{
                GameId: mux.Vars(r)["game_id"],
            }, nil
        },
    ),
    nil,
))
Client Streaming

For RPCs like rpc SendBatch(stream Request) returns (Response):

router.HandleFunc("/ws/v1/commands", gohttp.WSServe(
    grpcws.NewClientStreamHandler(
        // Create the gRPC stream
        func(ctx context.Context) (pb.GameService_SendCommandsClient, error) {
            return grpcClient.SendCommands(ctx)
        },
        // Factory for new request messages
        func() *pb.GameCommand { return &pb.GameCommand{} },
    ),
    nil,
))
Bidirectional Streaming

For RPCs like rpc Chat(stream Request) returns (stream Response):

router.HandleFunc("/ws/v1/sync", gohttp.WSServe(
    grpcws.NewBidiStreamHandler(
        // Create the gRPC stream
        func(ctx context.Context) (pb.GameService_SyncGameClient, error) {
            return grpcClient.SyncGame(ctx)
        },
        // Factory for new request messages
        func() *pb.PlayerAction { return &pb.PlayerAction{} },
    ),
    nil,
))

Message Protocol

All messages use a JSON envelope format for consistency and easy debugging.

Server → Client Messages
// Data message with protojson payload
{"type": "data", "data": {"field": "value", ...}}

// Error message
{"type": "error", "error": "error message"}

// Stream completed
{"type": "stream_end"}

// Heartbeat ping
{"type": "ping", "pingId": 123}
Client → Server Messages
// Data message with protojson payload
{"type": "data", "data": {"field": "value", ...}}

// Heartbeat response
{"type": "pong", "pingId": 123}

// Request stream cancellation
{"type": "cancel"}

// Half-close (client done sending, for client/bidi streaming)
{"type": "end_send"}

Architecture

Stream Interfaces

The package defines generic interfaces that match gRPC streaming patterns:

// ServerStream - for server streaming RPCs
type ServerStream[Resp proto.Message] interface {
    Recv() (Resp, error)
    grpc.ClientStream
}

// ClientStream - for client streaming RPCs
type ClientStream[Req, Resp proto.Message] interface {
    Send(Req) error
    CloseAndRecv() (Resp, error)
    grpc.ClientStream
}

// BidiStream - for bidirectional streaming RPCs
type BidiStream[Req, Resp proto.Message] interface {
    Send(Req) error
    Recv() (Resp, error)
    CloseSend() error
    grpc.ClientStream
}
Connection Types

Each streaming pattern has a corresponding connection type:

Type Pattern Client Can Server Can
ServerStreamConn Server Streaming Control (cancel, pong) Send data, ping
ClientStreamConn Client Streaming Send data, end_send Respond once
BidiStreamConn Bidirectional Send data, end_send Send data
Handler Types

Handlers validate HTTP requests and create connections:

type ServerStreamHandler[Req, Resp proto.Message, Stream ServerStream[Resp]] struct {
    CreateStream func(ctx context.Context, req Req) (Stream, error)
    ParseRequest func(r *http.Request) (Req, error)
}

type ClientStreamHandler[Req, Resp proto.Message, Stream ClientStream[Req, Resp]] struct {
    CreateStream func(ctx context.Context) (Stream, error)
    NewRequest   func() Req
}

type BidiStreamHandler[Req, Resp proto.Message, Stream BidiStream[Req, Resp]] struct {
    CreateStream func(ctx context.Context) (Stream, error)
    NewRequest   func() Req
}

Configuration

Custom Protojson Options

Configure protojson marshaling/unmarshaling:

handler := &grpcws.ServerStreamHandler[*pb.Req, *pb.Resp, pb.MyService_StreamClient]{
    CreateStream: createStreamFunc,
    ParseRequest: parseRequestFunc,
    MarshalOptions: protojson.MarshalOptions{
        EmitUnpopulated: true,
        UseProtoNames:   true,
    },
    UnmarshalOptions: protojson.UnmarshalOptions{
        DiscardUnknown: true,
    },
}
WebSocket Configuration

Configure the underlying WebSocket via WSConnConfig:

config := &gohttp.WSConnConfig{
    BiDirStreamConfig: &gohttp.BiDirStreamConfig{
        PingPeriod: 25 * time.Second,
        PongPeriod: 60 * time.Second,
    },
}

router.HandleFunc("/ws/v1/stream", gohttp.WSServe(handler, config))

Metrics

Each connection tracks metrics via StreamMetrics:

type StreamMetrics struct {
    ConnectedAt  time.Time
    MsgsSent     int64
    MsgsReceived int64
}

Access metrics in custom connection implementations or via logging.

Error Handling

Stream Errors

gRPC stream errors are automatically forwarded to the WebSocket client:

{"type": "error", "error": "rpc error: code = NotFound desc = game not found"}
Cancellation

Clients can cancel streams by sending:

{"type": "cancel"}

This triggers context cancellation on the gRPC stream.

Half-Close

For client and bidi streaming, clients signal they're done sending:

{"type": "end_send"}

Example: Complete Setup

package main

import (
    "context"
    "log"
    "net/http"

    "github.com/gorilla/mux"
    "github.com/panyam/servicekit/grpcws"
    gohttp "github.com/panyam/servicekit/http"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"

    pb "your/proto/package"
)

func main() {
    // Connect to gRPC server
    conn, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

    client := pb.NewGameServiceClient(conn)
    router := mux.NewRouter()

    // WebSocket endpoints
    ws := router.PathPrefix("/ws").Subrouter()

    // Server streaming
    ws.HandleFunc("/v1/games/{game_id}/subscribe", gohttp.WSServe(
        grpcws.NewServerStreamHandler(
            func(ctx context.Context, req *pb.SubscribeRequest) (pb.GameService_SubscribeClient, error) {
                return client.Subscribe(ctx, req)
            },
            func(r *http.Request) (*pb.SubscribeRequest, error) {
                return &pb.SubscribeRequest{GameId: mux.Vars(r)["game_id"]}, nil
            },
        ),
        nil,
    ))

    // Client streaming
    ws.HandleFunc("/v1/games/{game_id}/commands", gohttp.WSServe(
        grpcws.NewClientStreamHandler(
            func(ctx context.Context) (pb.GameService_SendCommandsClient, error) {
                return client.SendCommands(ctx)
            },
            func() *pb.GameCommand { return &pb.GameCommand{} },
        ),
        nil,
    ))

    // Bidirectional streaming
    ws.HandleFunc("/v1/games/{game_id}/sync", gohttp.WSServe(
        grpcws.NewBidiStreamHandler(
            func(ctx context.Context) (pb.GameService_SyncGameClient, error) {
                return client.SyncGame(ctx)
            },
            func() *pb.PlayerAction { return &pb.PlayerAction{} },
        ),
        nil,
    ))

    log.Println("Starting server on :8080")
    log.Fatal(http.ListenAndServe(":8080", router))
}

Demo

See cmd/grpcws-demo/ for a complete working example with:

  • Real proto files (pb/game.proto)
  • Buf configuration for code generation
  • Mock gRPC streams for testing
  • Browser-based test UI

Run the demo:

go run ./cmd/grpcws-demo
# Open http://localhost:8080

Comparison with grpc-gateway

Feature grpc-gateway (SSE) grpcws
Server Streaming Yes Yes
Client Streaming No Yes
Bidirectional No Yes
Heartbeat No Yes (ping-pong)
Timeout Detection Limited Full support
Cancellation Limited Full support
Protocol SSE/REST WebSocket

Use grpc-gateway for simple REST + server streaming. Use grpcws when you need client streaming, bidirectional streaming, or robust connection management.

Documentation

Overview

Package grpcws provides WebSocket transport for gRPC streaming RPCs.

This package bridges gRPC streaming with WebSocket connections, providing:

  • Full lifecycle management (ping/pong, timeouts, graceful shutdown)
  • Support for all streaming modes (server, client, bidirectional)
  • Type-safe message handling via generics
  • Coexistence with grpc-gateway (separate route prefixes)

Why Use gRPC-over-WebSocket?

While grpc-gateway provides SSE (Server-Sent Events) for streaming, WebSocket offers:

  • Bidirectional communication (SSE is server-to-client only)
  • Heartbeat/ping-pong for dead connection detection
  • Connection lifecycle hooks for proper resource management
  • Better proxy/load balancer compatibility

Streaming Modes

Server Streaming (rpc Foo(Req) returns (stream Resp)):

handler := grpcws.NewServerStreamHandler(
    func(ctx context.Context, req *pb.SubscribeRequest) (pb.Service_SubscribeClient, error) {
        return client.Subscribe(ctx, req)
    },
    parseRequest,
)
router.HandleFunc("/ws/v1/subscribe", gohttp.WSServe(handler, nil))

Client Streaming (rpc Foo(stream Req) returns (Resp)):

handler := grpcws.NewClientStreamHandler(
    func(ctx context.Context) (pb.Service_SendClient, error) {
        return client.Send(ctx)
    },
    parseMessage,
)

Bidirectional Streaming (rpc Foo(stream Req) returns (stream Resp)):

handler := grpcws.NewBidiStreamHandler(
    func(ctx context.Context) (pb.Service_SyncClient, error) {
        return client.Sync(ctx)
    },
    parseMessage,
)

Message Protocol

All messages use a JSON envelope for control:

// Server → Client
{"type": "data", "data": <payload>}      // Stream message
{"type": "error", "error": "msg"}         // Error
{"type": "stream_end"}                    // Stream completed
{"type": "ping", "pingId": 1}            // Heartbeat

// Client → Server
{"type": "data", "data": <payload>}      // Send message (client/bidi streaming)
{"type": "pong", "pingId": 1}            // Heartbeat response
{"type": "cancel"}                        // Cancel stream
{"type": "end_send"}                      // Half-close (client done sending)

Route Registration

Use a separate route prefix to coexist with grpc-gateway:

// WebSocket on /ws/v1/...
wsRouter := router.PathPrefix("/ws").Subrouter()
wsRouter.HandleFunc("/v1/games/{id}/subscribe", gohttp.WSServe(handler, nil))

// grpc-gateway on /v1/... (REST/SSE)
router.PathPrefix("/v1/").Handler(gwMux)

Index

Constants

View Source
const (
	TypeData      = "data"
	TypeError     = "error"
	TypeStreamEnd = "stream_end"
	TypePing      = "ping"
	TypePong      = "pong"
	TypeCancel    = "cancel"
	TypeEndSend   = "end_send"
)

MessageType constants for the JSON envelope

Variables

This section is empty.

Functions

This section is empty.

Types

type BidiStream

type BidiStream[Req proto.Message, Resp proto.Message] interface {
	Send(Req) error
	Recv() (Resp, error)
	CloseSend() error
	grpc.ClientStream
}

BidiStream is the interface for bidirectional streaming gRPC clients

type BidiStreamConn

type BidiStreamConn[Req proto.Message, Resp proto.Message, Stream BidiStream[Req, Resp]] struct {
	// contains filtered or unexported fields
}

BidiStreamConn handles bidirectional streaming RPCs over WebSocket. Both client and server send multiple messages concurrently.

RPC pattern: rpc SyncGame(stream Request) returns (stream Response)

func (*BidiStreamConn[Req, Resp, Stream]) HandleMessage

func (c *BidiStreamConn[Req, Resp, Stream]) HandleMessage(msg ControlMessage) error

HandleMessage processes messages from the client

func (*BidiStreamConn[Req, Resp, Stream]) OnClose

func (c *BidiStreamConn[Req, Resp, Stream]) OnClose()

OnClose cancels the stream and cleans up

func (*BidiStreamConn[Req, Resp, Stream]) OnStart

func (c *BidiStreamConn[Req, Resp, Stream]) OnStart(conn *websocket.Conn) error

OnStart initializes the connection and starts forwarding responses

func (*BidiStreamConn) SendPing

func (c *BidiStreamConn) SendPing() error

SendPing sends a ping using the gRPC-WS ControlMessage envelope format. This overrides BaseConn.SendPing to use the proper envelope protocol.

type BidiStreamHandler

type BidiStreamHandler[Req proto.Message, Resp proto.Message, Stream BidiStream[Req, Resp]] struct {
	// CreateStream creates the gRPC stream
	CreateStream func(ctx context.Context) (Stream, error)

	// NewRequest creates a new request message instance (for decoding)
	NewRequest func() Req

	// MarshalOptions for protojson encoding (optional)
	MarshalOptions protojson.MarshalOptions

	// UnmarshalOptions for protojson decoding (optional)
	UnmarshalOptions protojson.UnmarshalOptions
}

BidiStreamHandler creates BidiStreamConn instances for incoming connections.

func NewBidiStreamHandler

func NewBidiStreamHandler[Req proto.Message, Resp proto.Message, Stream BidiStream[Req, Resp]](
	createStream func(ctx context.Context) (Stream, error),
	newRequest func() Req,
) *BidiStreamHandler[Req, Resp, Stream]

NewBidiStreamHandler creates a handler for bidirectional streaming RPCs.

Example:

handler := grpcws.NewBidiStreamHandler(
    func(ctx context.Context) (pb.GameService_SyncGameClient, error) {
        return client.SyncGame(ctx)
    },
    func() *pb.PlayerAction { return &pb.PlayerAction{} },
)

func (*BidiStreamHandler[Req, Resp, Stream]) Validate

func (h *BidiStreamHandler[Req, Resp, Stream]) Validate(
	w http.ResponseWriter,
	r *http.Request,
) (*BidiStreamConn[Req, Resp, Stream], bool)

Validate implements WSHandler. Creates the gRPC stream.

type ClientStream

type ClientStream[Req proto.Message, Resp proto.Message] interface {
	Send(Req) error
	CloseAndRecv() (Resp, error)
	grpc.ClientStream
}

ClientStream is the interface for client-streaming gRPC clients

type ClientStreamConn

type ClientStreamConn[Req proto.Message, Resp proto.Message, Stream ClientStream[Req, Resp]] struct {
	// contains filtered or unexported fields
}

ClientStreamConn handles client-streaming RPCs over WebSocket. The client sends multiple messages; server responds once at the end.

RPC pattern: rpc SendCommands(stream Request) returns (Response)

func (*ClientStreamConn[Req, Resp, Stream]) HandleMessage

func (c *ClientStreamConn[Req, Resp, Stream]) HandleMessage(msg ControlMessage) error

HandleMessage processes messages from the client

func (*ClientStreamConn[Req, Resp, Stream]) OnClose

func (c *ClientStreamConn[Req, Resp, Stream]) OnClose()

OnClose cancels the stream and cleans up

func (*ClientStreamConn[Req, Resp, Stream]) OnStart

func (c *ClientStreamConn[Req, Resp, Stream]) OnStart(conn *websocket.Conn) error

OnStart initializes the connection

func (*ClientStreamConn) SendPing

func (c *ClientStreamConn) SendPing() error

SendPing sends a ping using the gRPC-WS ControlMessage envelope format. This overrides BaseConn.SendPing to use the proper envelope protocol.

type ClientStreamHandler

type ClientStreamHandler[Req proto.Message, Resp proto.Message, Stream ClientStream[Req, Resp]] struct {
	// CreateStream creates the gRPC stream
	CreateStream func(ctx context.Context) (Stream, error)

	// NewRequest creates a new request message instance (for decoding)
	NewRequest func() Req

	// MarshalOptions for protojson encoding (optional)
	MarshalOptions protojson.MarshalOptions

	// UnmarshalOptions for protojson decoding (optional)
	UnmarshalOptions protojson.UnmarshalOptions
}

ClientStreamHandler creates ClientStreamConn instances for incoming connections.

func NewClientStreamHandler

func NewClientStreamHandler[Req proto.Message, Resp proto.Message, Stream ClientStream[Req, Resp]](
	createStream func(ctx context.Context) (Stream, error),
	newRequest func() Req,
) *ClientStreamHandler[Req, Resp, Stream]

NewClientStreamHandler creates a handler for client-streaming RPCs.

Example:

handler := grpcws.NewClientStreamHandler(
    func(ctx context.Context) (pb.GameService_SendCommandsClient, error) {
        return client.SendCommands(ctx)
    },
    func() *pb.GameCommand { return &pb.GameCommand{} },
)

func (*ClientStreamHandler[Req, Resp, Stream]) Validate

func (h *ClientStreamHandler[Req, Resp, Stream]) Validate(
	w http.ResponseWriter,
	r *http.Request,
) (*ClientStreamConn[Req, Resp, Stream], bool)

Validate implements WSHandler. Creates the gRPC stream.

type ControlMessage

type ControlMessage struct {
	Type   string `json:"type"`
	Data   any    `json:"data,omitempty"`
	Error  string `json:"error,omitempty"`
	PingId int64  `json:"pingId,omitempty"`
}

ControlMessage represents the JSON envelope for all WebSocket messages

type GRPCWSCodec

type GRPCWSCodec[Req proto.Message, Resp proto.Message] struct {
	// NewRequest creates a new request message instance
	NewRequest func() Req

	// MarshalOptions for protojson encoding
	MarshalOptions protojson.MarshalOptions

	// UnmarshalOptions for protojson decoding
	UnmarshalOptions protojson.UnmarshalOptions
}

GRPCWSCodec handles encoding/decoding of gRPC messages over WebSocket. It uses a JSON envelope for control messages with protojson payloads.

func (*GRPCWSCodec[Req, Resp]) Decode

func (c *GRPCWSCodec[Req, Resp]) Decode(data []byte, msgType gohttp.MessageType) (ControlMessage, error)

Decode parses a control message from raw WebSocket data

func (*GRPCWSCodec[Req, Resp]) DecodeRequest

func (c *GRPCWSCodec[Req, Resp]) DecodeRequest(msg ControlMessage) (Req, error)

DecodeRequest parses a request from a control message's data field

func (*GRPCWSCodec[Req, Resp]) Encode

func (c *GRPCWSCodec[Req, Resp]) Encode(msg ControlMessage) ([]byte, gohttp.MessageType, error)

Encode wraps a response in a control message envelope

func (*GRPCWSCodec[Req, Resp]) EncodeData

func (c *GRPCWSCodec[Req, Resp]) EncodeData(resp Resp) (ControlMessage, error)

EncodeData wraps a proto response in a data message

type ServerStream

type ServerStream[Resp proto.Message] interface {
	Recv() (Resp, error)
	grpc.ClientStream
}

ServerStream is the interface for server-streaming gRPC clients

type ServerStreamConn

type ServerStreamConn[Req proto.Message, Resp proto.Message, Stream ServerStream[Resp]] struct {
	// contains filtered or unexported fields
}

ServerStreamConn handles server-streaming RPCs over WebSocket. The server sends multiple messages; client receives and can send control messages.

RPC pattern: rpc Subscribe(Request) returns (stream Response)

func (*ServerStreamConn[Req, Resp, Stream]) HandleMessage

func (c *ServerStreamConn[Req, Resp, Stream]) HandleMessage(msg ControlMessage) error

HandleMessage processes control messages from the client

func (*ServerStreamConn[Req, Resp, Stream]) OnClose

func (c *ServerStreamConn[Req, Resp, Stream]) OnClose()

OnClose cancels the gRPC stream and cleans up

func (*ServerStreamConn[Req, Resp, Stream]) OnStart

func (c *ServerStreamConn[Req, Resp, Stream]) OnStart(conn *websocket.Conn) error

OnStart initializes the connection and starts forwarding gRPC responses

func (*ServerStreamConn) SendPing

func (c *ServerStreamConn) SendPing() error

SendPing sends a ping using the gRPC-WS ControlMessage envelope format. This overrides BaseConn.SendPing to use the proper envelope protocol.

type ServerStreamHandler

type ServerStreamHandler[Req proto.Message, Resp proto.Message, Stream ServerStream[Resp]] struct {
	// CreateStream creates the gRPC stream from a request
	CreateStream func(ctx context.Context, req Req) (Stream, error)

	// ParseRequest extracts the initial request from the HTTP request
	ParseRequest func(r *http.Request) (Req, error)

	// MarshalOptions for protojson encoding (optional)
	MarshalOptions protojson.MarshalOptions

	// UnmarshalOptions for protojson decoding (optional)
	UnmarshalOptions protojson.UnmarshalOptions
}

ServerStreamHandler creates ServerStreamConn instances for incoming connections.

func NewServerStreamHandler

func NewServerStreamHandler[Req proto.Message, Resp proto.Message, Stream ServerStream[Resp]](
	createStream func(ctx context.Context, req Req) (Stream, error),
	parseRequest func(r *http.Request) (Req, error),
) *ServerStreamHandler[Req, Resp, Stream]

NewServerStreamHandler creates a handler for server-streaming RPCs.

Example:

handler := grpcws.NewServerStreamHandler(
    func(ctx context.Context, req *pb.SubscribeRequest) (pb.GameService_SubscribeClient, error) {
        return client.Subscribe(ctx, req)
    },
    func(r *http.Request) (*pb.SubscribeRequest, error) {
        vars := mux.Vars(r)
        return &pb.SubscribeRequest{GameId: vars["game_id"]}, nil
    },
)

func (*ServerStreamHandler[Req, Resp, Stream]) Validate

func (h *ServerStreamHandler[Req, Resp, Stream]) Validate(
	w http.ResponseWriter,
	r *http.Request,
) (*ServerStreamConn[Req, Resp, Stream], bool)

Validate implements WSHandler. Parses the request and creates the gRPC stream.

type StreamMetrics

type StreamMetrics struct {
	ConnectedAt  time.Time
	MsgsSent     int64
	MsgsReceived int64
}

StreamMetrics tracks connection statistics

func (*StreamMetrics) IncrementReceived

func (m *StreamMetrics) IncrementReceived() int64

IncrementReceived atomically increments the received counter

func (*StreamMetrics) IncrementSent

func (m *StreamMetrics) IncrementSent() int64

IncrementSent atomically increments the sent counter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL