Documentation
¶
Overview ¶
Package grpcws provides WebSocket transport for gRPC streaming RPCs.
This package bridges gRPC streaming with WebSocket connections, providing:
- Full lifecycle management (ping/pong, timeouts, graceful shutdown)
- Support for all streaming modes (server, client, bidirectional)
- Type-safe message handling via generics
- Coexistence with grpc-gateway (separate route prefixes)
Why Use gRPC-over-WebSocket? ¶
While grpc-gateway provides SSE (Server-Sent Events) for streaming, WebSocket offers:
- Bidirectional communication (SSE is server-to-client only)
- Heartbeat/ping-pong for dead connection detection
- Connection lifecycle hooks for proper resource management
- Better proxy/load balancer compatibility
Streaming Modes ¶
Server Streaming (rpc Foo(Req) returns (stream Resp)):
handler := grpcws.NewServerStreamHandler(
func(ctx context.Context, req *pb.SubscribeRequest) (pb.Service_SubscribeClient, error) {
return client.Subscribe(ctx, req)
},
parseRequest,
)
router.HandleFunc("/ws/v1/subscribe", gohttp.WSServe(handler, nil))
Client Streaming (rpc Foo(stream Req) returns (Resp)):
handler := grpcws.NewClientStreamHandler(
func(ctx context.Context) (pb.Service_SendClient, error) {
return client.Send(ctx)
},
parseMessage,
)
Bidirectional Streaming (rpc Foo(stream Req) returns (stream Resp)):
handler := grpcws.NewBidiStreamHandler(
func(ctx context.Context) (pb.Service_SyncClient, error) {
return client.Sync(ctx)
},
parseMessage,
)
Message Protocol ¶
All messages use a JSON envelope for control:
// Server → Client
{"type": "data", "data": <payload>} // Stream message
{"type": "error", "error": "msg"} // Error
{"type": "stream_end"} // Stream completed
{"type": "ping", "pingId": 1} // Heartbeat
// Client → Server
{"type": "data", "data": <payload>} // Send message (client/bidi streaming)
{"type": "pong", "pingId": 1} // Heartbeat response
{"type": "cancel"} // Cancel stream
{"type": "end_send"} // Half-close (client done sending)
Route Registration ¶
Use a separate route prefix to coexist with grpc-gateway:
// WebSocket on /ws/v1/...
wsRouter := router.PathPrefix("/ws").Subrouter()
wsRouter.HandleFunc("/v1/games/{id}/subscribe", gohttp.WSServe(handler, nil))
// grpc-gateway on /v1/... (REST/SSE)
router.PathPrefix("/v1/").Handler(gwMux)
Index ¶
- Constants
- type BidiStream
- type BidiStreamConn
- type BidiStreamHandler
- type ClientStream
- type ClientStreamConn
- type ClientStreamHandler
- type ControlMessage
- type GRPCWSCodec
- func (c *GRPCWSCodec[Req, Resp]) Decode(data []byte, msgType gohttp.MessageType) (ControlMessage, error)
- func (c *GRPCWSCodec[Req, Resp]) DecodeRequest(msg ControlMessage) (Req, error)
- func (c *GRPCWSCodec[Req, Resp]) Encode(msg ControlMessage) ([]byte, gohttp.MessageType, error)
- func (c *GRPCWSCodec[Req, Resp]) EncodeData(resp Resp) (ControlMessage, error)
- type ServerStream
- type ServerStreamConn
- type ServerStreamHandler
- type StreamMetrics
Constants ¶
const ( TypeData = "data" TypeError = "error" TypeStreamEnd = "stream_end" TypePing = "ping" TypePong = "pong" TypeCancel = "cancel" TypeEndSend = "end_send" )
MessageType constants for the JSON envelope
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BidiStream ¶
type BidiStream[Req proto.Message, Resp proto.Message] interface { Send(Req) error Recv() (Resp, error) CloseSend() error grpc.ClientStream }
BidiStream is the interface for bidirectional streaming gRPC clients
type BidiStreamConn ¶
type BidiStreamConn[Req proto.Message, Resp proto.Message, Stream BidiStream[Req, Resp]] struct { // contains filtered or unexported fields }
BidiStreamConn handles bidirectional streaming RPCs over WebSocket. Both client and server send multiple messages concurrently.
RPC pattern: rpc SyncGame(stream Request) returns (stream Response)
func (*BidiStreamConn[Req, Resp, Stream]) HandleMessage ¶
func (c *BidiStreamConn[Req, Resp, Stream]) HandleMessage(msg ControlMessage) error
HandleMessage processes messages from the client
func (*BidiStreamConn[Req, Resp, Stream]) OnClose ¶
func (c *BidiStreamConn[Req, Resp, Stream]) OnClose()
OnClose cancels the stream and cleans up
type BidiStreamHandler ¶
type BidiStreamHandler[Req proto.Message, Resp proto.Message, Stream BidiStream[Req, Resp]] struct { // CreateStream creates the gRPC stream CreateStream func(ctx context.Context) (Stream, error) // NewRequest creates a new request message instance (for decoding) NewRequest func() Req // MarshalOptions for protojson encoding (optional) MarshalOptions protojson.MarshalOptions // UnmarshalOptions for protojson decoding (optional) UnmarshalOptions protojson.UnmarshalOptions }
BidiStreamHandler creates BidiStreamConn instances for incoming connections.
func NewBidiStreamHandler ¶
func NewBidiStreamHandler[Req proto.Message, Resp proto.Message, Stream BidiStream[Req, Resp]]( createStream func(ctx context.Context) (Stream, error), newRequest func() Req, ) *BidiStreamHandler[Req, Resp, Stream]
NewBidiStreamHandler creates a handler for bidirectional streaming RPCs.
Example:
handler := grpcws.NewBidiStreamHandler(
func(ctx context.Context) (pb.GameService_SyncGameClient, error) {
return client.SyncGame(ctx)
},
func() *pb.PlayerAction { return &pb.PlayerAction{} },
)
func (*BidiStreamHandler[Req, Resp, Stream]) Validate ¶
func (h *BidiStreamHandler[Req, Resp, Stream]) Validate( w http.ResponseWriter, r *http.Request, ) (*BidiStreamConn[Req, Resp, Stream], bool)
Validate implements WSHandler. Creates the gRPC stream.
type ClientStream ¶
type ClientStream[Req proto.Message, Resp proto.Message] interface { Send(Req) error CloseAndRecv() (Resp, error) grpc.ClientStream }
ClientStream is the interface for client-streaming gRPC clients
type ClientStreamConn ¶
type ClientStreamConn[Req proto.Message, Resp proto.Message, Stream ClientStream[Req, Resp]] struct { // contains filtered or unexported fields }
ClientStreamConn handles client-streaming RPCs over WebSocket. The client sends multiple messages; server responds once at the end.
RPC pattern: rpc SendCommands(stream Request) returns (Response)
func (*ClientStreamConn[Req, Resp, Stream]) HandleMessage ¶
func (c *ClientStreamConn[Req, Resp, Stream]) HandleMessage(msg ControlMessage) error
HandleMessage processes messages from the client
func (*ClientStreamConn[Req, Resp, Stream]) OnClose ¶
func (c *ClientStreamConn[Req, Resp, Stream]) OnClose()
OnClose cancels the stream and cleans up
type ClientStreamHandler ¶
type ClientStreamHandler[Req proto.Message, Resp proto.Message, Stream ClientStream[Req, Resp]] struct { // CreateStream creates the gRPC stream CreateStream func(ctx context.Context) (Stream, error) // NewRequest creates a new request message instance (for decoding) NewRequest func() Req // MarshalOptions for protojson encoding (optional) MarshalOptions protojson.MarshalOptions // UnmarshalOptions for protojson decoding (optional) UnmarshalOptions protojson.UnmarshalOptions }
ClientStreamHandler creates ClientStreamConn instances for incoming connections.
func NewClientStreamHandler ¶
func NewClientStreamHandler[Req proto.Message, Resp proto.Message, Stream ClientStream[Req, Resp]]( createStream func(ctx context.Context) (Stream, error), newRequest func() Req, ) *ClientStreamHandler[Req, Resp, Stream]
NewClientStreamHandler creates a handler for client-streaming RPCs.
Example:
handler := grpcws.NewClientStreamHandler(
func(ctx context.Context) (pb.GameService_SendCommandsClient, error) {
return client.SendCommands(ctx)
},
func() *pb.GameCommand { return &pb.GameCommand{} },
)
func (*ClientStreamHandler[Req, Resp, Stream]) Validate ¶
func (h *ClientStreamHandler[Req, Resp, Stream]) Validate( w http.ResponseWriter, r *http.Request, ) (*ClientStreamConn[Req, Resp, Stream], bool)
Validate implements WSHandler. Creates the gRPC stream.
type ControlMessage ¶
type ControlMessage struct {
Type string `json:"type"`
Data any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
PingId int64 `json:"pingId,omitempty"`
}
ControlMessage represents the JSON envelope for all WebSocket messages
type GRPCWSCodec ¶
type GRPCWSCodec[Req proto.Message, Resp proto.Message] struct { // NewRequest creates a new request message instance NewRequest func() Req // MarshalOptions for protojson encoding MarshalOptions protojson.MarshalOptions // UnmarshalOptions for protojson decoding UnmarshalOptions protojson.UnmarshalOptions }
GRPCWSCodec handles encoding/decoding of gRPC messages over WebSocket. It uses a JSON envelope for control messages with protojson payloads.
func (*GRPCWSCodec[Req, Resp]) Decode ¶
func (c *GRPCWSCodec[Req, Resp]) Decode(data []byte, msgType gohttp.MessageType) (ControlMessage, error)
Decode parses a control message from raw WebSocket data
func (*GRPCWSCodec[Req, Resp]) DecodeRequest ¶
func (c *GRPCWSCodec[Req, Resp]) DecodeRequest(msg ControlMessage) (Req, error)
DecodeRequest parses a request from a control message's data field
func (*GRPCWSCodec[Req, Resp]) Encode ¶
func (c *GRPCWSCodec[Req, Resp]) Encode(msg ControlMessage) ([]byte, gohttp.MessageType, error)
Encode wraps a response in a control message envelope
func (*GRPCWSCodec[Req, Resp]) EncodeData ¶
func (c *GRPCWSCodec[Req, Resp]) EncodeData(resp Resp) (ControlMessage, error)
EncodeData wraps a proto response in a data message
type ServerStream ¶
type ServerStream[Resp proto.Message] interface { Recv() (Resp, error) grpc.ClientStream }
ServerStream is the interface for server-streaming gRPC clients
type ServerStreamConn ¶
type ServerStreamConn[Req proto.Message, Resp proto.Message, Stream ServerStream[Resp]] struct { // contains filtered or unexported fields }
ServerStreamConn handles server-streaming RPCs over WebSocket. The server sends multiple messages; client receives and can send control messages.
RPC pattern: rpc Subscribe(Request) returns (stream Response)
func (*ServerStreamConn[Req, Resp, Stream]) HandleMessage ¶
func (c *ServerStreamConn[Req, Resp, Stream]) HandleMessage(msg ControlMessage) error
HandleMessage processes control messages from the client
func (*ServerStreamConn[Req, Resp, Stream]) OnClose ¶
func (c *ServerStreamConn[Req, Resp, Stream]) OnClose()
OnClose cancels the gRPC stream and cleans up
type ServerStreamHandler ¶
type ServerStreamHandler[Req proto.Message, Resp proto.Message, Stream ServerStream[Resp]] struct { // CreateStream creates the gRPC stream from a request CreateStream func(ctx context.Context, req Req) (Stream, error) // ParseRequest extracts the initial request from the HTTP request ParseRequest func(r *http.Request) (Req, error) // MarshalOptions for protojson encoding (optional) MarshalOptions protojson.MarshalOptions // UnmarshalOptions for protojson decoding (optional) UnmarshalOptions protojson.UnmarshalOptions }
ServerStreamHandler creates ServerStreamConn instances for incoming connections.
func NewServerStreamHandler ¶
func NewServerStreamHandler[Req proto.Message, Resp proto.Message, Stream ServerStream[Resp]]( createStream func(ctx context.Context, req Req) (Stream, error), parseRequest func(r *http.Request) (Req, error), ) *ServerStreamHandler[Req, Resp, Stream]
NewServerStreamHandler creates a handler for server-streaming RPCs.
Example:
handler := grpcws.NewServerStreamHandler(
func(ctx context.Context, req *pb.SubscribeRequest) (pb.GameService_SubscribeClient, error) {
return client.Subscribe(ctx, req)
},
func(r *http.Request) (*pb.SubscribeRequest, error) {
vars := mux.Vars(r)
return &pb.SubscribeRequest{GameId: vars["game_id"]}, nil
},
)
func (*ServerStreamHandler[Req, Resp, Stream]) Validate ¶
func (h *ServerStreamHandler[Req, Resp, Stream]) Validate( w http.ResponseWriter, r *http.Request, ) (*ServerStreamConn[Req, Resp, Stream], bool)
Validate implements WSHandler. Parses the request and creates the gRPC stream.
type StreamMetrics ¶
StreamMetrics tracks connection statistics
func (*StreamMetrics) IncrementReceived ¶
func (m *StreamMetrics) IncrementReceived() int64
IncrementReceived atomically increments the received counter
func (*StreamMetrics) IncrementSent ¶
func (m *StreamMetrics) IncrementSent() int64
IncrementSent atomically increments the sent counter