Documentation
¶
Overview ¶
Package connectsse provides an HTTP transport layer for Connect RPC that uses Server-Sent Events (SSE) for streaming responses.
This package implements both client and server components that translate between the Connect RPC protocol and an HTTP+JSON/SSE transport layer. The translation happens at the net/http layer:
Client.Do implements connect.HTTPClient and converts Connect RPC requests to nested HTTP+JSON requests, then parses JSON or SSE responses back to the Connect RPC format.
Server.ServeHTTP implements http.Handler and does the reverse transformation, decoding nested requests and encoding responses as JSON or SSE.
For unary RPCs, requests and responses use application/json. For streaming RPCs, responses use Server-Sent Events (text/event-stream) on the wire, but are converted to the Connect RPC streaming format (application/connect+json with enveloped messages).
Example (Streaming) ¶
package main
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httptest"
"net/url"
"connectrpc.com/connect"
connectsse "github.com/firetiger-oss/connect-sse"
greetv1 "github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1"
"github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1/greetv1connect"
)
// greetServer implements the GreetService.
type greetServer struct {
greetv1connect.UnimplementedGreetServiceHandler
}
func (s *greetServer) Greet(
ctx context.Context,
req *connect.Request[greetv1.GreetRequest],
) (*connect.Response[greetv1.GreetResponse], error) {
return connect.NewResponse(&greetv1.GreetResponse{
Greeting: fmt.Sprintf("Hello, %s!", req.Msg.Name),
}), nil
}
func (s *greetServer) GreetStream(
ctx context.Context,
req *connect.Request[greetv1.GreetRequest],
stream *connect.ServerStream[greetv1.GreetResponse],
) error {
for i := range 3 {
if err := stream.Send(&greetv1.GreetResponse{
Greeting: fmt.Sprintf("Hello, %s! (message %d)", req.Msg.Name, i+1),
}); err != nil {
return err
}
}
return nil
}
func main() {
// Create a Connect RPC handler
mux := http.NewServeMux()
path, handler := greetv1connect.NewGreetServiceHandler(
&greetServer{},
connect.WithCompression("gzip", nil, nil), // Disable gzip compression
)
mux.Handle(path, handler)
// Wrap with SSE server
server := httptest.NewServer(&connectsse.Server{
Handler: mux,
})
defer server.Close()
// Create Connect RPC client with SSE transport
serverURL, _ := url.Parse(server.URL)
client := greetv1connect.NewGreetServiceClient(
&connectsse.Client{},
serverURL.String(),
connect.WithProtoJSON(),
)
// Make streaming call
ctx := context.Background()
stream, err := client.GreetStream(ctx, connect.NewRequest(&greetv1.GreetRequest{
Name: "Streaming",
}))
if err != nil {
log.Fatal(err)
}
// Receive streamed messages
for stream.Receive() {
fmt.Println(stream.Msg().Greeting)
}
if err := stream.Err(); err != nil {
log.Fatal(err)
}
}
Output: Hello, Streaming! (message 1) Hello, Streaming! (message 2) Hello, Streaming! (message 3)
Example (Unary) ¶
package main
import (
"context"
"fmt"
"log"
"net/http"
"net/http/httptest"
"net/url"
"connectrpc.com/connect"
connectsse "github.com/firetiger-oss/connect-sse"
greetv1 "github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1"
"github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1/greetv1connect"
)
// greetServer implements the GreetService.
type greetServer struct {
greetv1connect.UnimplementedGreetServiceHandler
}
func (s *greetServer) Greet(
ctx context.Context,
req *connect.Request[greetv1.GreetRequest],
) (*connect.Response[greetv1.GreetResponse], error) {
return connect.NewResponse(&greetv1.GreetResponse{
Greeting: fmt.Sprintf("Hello, %s!", req.Msg.Name),
}), nil
}
func (s *greetServer) GreetStream(
ctx context.Context,
req *connect.Request[greetv1.GreetRequest],
stream *connect.ServerStream[greetv1.GreetResponse],
) error {
for i := range 3 {
if err := stream.Send(&greetv1.GreetResponse{
Greeting: fmt.Sprintf("Hello, %s! (message %d)", req.Msg.Name, i+1),
}); err != nil {
return err
}
}
return nil
}
func main() {
// Create a Connect RPC handler
mux := http.NewServeMux()
path, handler := greetv1connect.NewGreetServiceHandler(
&greetServer{},
connect.WithCompression("gzip", nil, nil), // Disable gzip compression
)
mux.Handle(path, handler)
// Wrap with SSE server
server := httptest.NewServer(&connectsse.Server{
Handler: mux,
})
defer server.Close()
// Create Connect RPC client with SSE transport
serverURL, _ := url.Parse(server.URL)
client := greetv1connect.NewGreetServiceClient(
&connectsse.Client{},
serverURL.String(),
connect.WithProtoJSON(),
)
// Make unary call
ctx := context.Background()
resp, err := client.Greet(ctx, connect.NewRequest(&greetv1.GreetRequest{
Name: "World",
}))
if err != nil {
log.Fatal(err)
}
fmt.Println(resp.Msg.Greeting)
}
Output: Hello, World!
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// URL is the address of the SSE gateway. Any fields set on URL override
// the corresponding fields of the outgoing request URL; unset fields fall
// back to the values from the request. If nil, the request URL is used
// unchanged, which is appropriate when the Connect RPC client and the SSE
// gateway share the same base URL.
URL *url.URL
connect.HTTPClient
}
Client implements connect.HTTPClient and translates Connect RPC requests to HTTP+JSON requests with nested request structure, and translates JSON or SSE responses back to the Connect RPC format.
type Request ¶
type Request struct {
Procedure string `json:"procedure"`
Header http.Header `json:"header,omitempty"`
Message json.RawMessage `json:"message,omitempty"`
}
Request represents a nested Connect RPC request that will be encoded in the body of the outer HTTP POST request sent by the Client.