connectsse

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

README

connect-sse

Server-Sent Events (SSE) transport layer for Connect RPC.

Motivation

Modern AI agent workloads have increasingly adopted Server-Sent Events (SSE) as their preferred transport protocol for streaming responses. SSE provides a simple, HTTP-based streaming mechanism that works seamlessly across various network infrastructures, including corporate proxies and CDNs that may have difficulty with WebSockets or gRPC.

While Connect RPC offers an excellent framework for building type-safe APIs with protocol buffers, there are scenarios where you need to bridge between Connect RPC's streaming protocol and SSE:

  • AI Gateway Integration: When building gateways that need to expose Connect RPC services through SSE for compatibility with AI agent frameworks
  • Legacy System Support: When integrating with existing systems that expect SSE streams
  • Network Constraints: When operating in environments where only HTTP/1.1 is available or where SSE has better support than other streaming protocols

The connect-sse package provides this bridge by implementing translation at the net/http layer, allowing Connect RPC services to communicate using SSE as the wire format while maintaining full type safety and the ergonomics of Connect RPC.

Installation

go get github.com/firetiger-oss/connect-sse

Usage

Server Example

Expose a Connect RPC handler through an SSE-compatible endpoint:

package main

import (
    "context"
    "log"
    "net/http"

    "connectrpc.com/connect"
    "github.com/firetiger-oss/connect-sse"

    "example.com/gen/greet/v1/greetv1connect"
)

type GreetServer struct{}

func (s *GreetServer) Greet(
    ctx context.Context,
    req *connect.Request[greetv1.GreetRequest],
) (*connect.Response[greetv1.GreetResponse], error) {
    return connect.NewResponse(&greetv1.GreetResponse{
        Greeting: "Hello, " + req.Msg.Name,
    }), nil
}

func (s *GreetServer) GreetStream(
    ctx context.Context,
    req *connect.Request[greetv1.GreetRequest],
    stream *connect.ServerStream[greetv1.GreetResponse],
) error {
    for i := range 5 {
        if err := stream.Send(&greetv1.GreetResponse{
            Greeting: fmt.Sprintf("Hello %s, message %d", req.Msg.Name, i),
        }); err != nil {
            return err
        }
    }
    return nil
}

func main() {
    mux := http.NewServeMux()
    path, handler := greetv1connect.NewGreetServiceHandler(&GreetServer{})
    mux.Handle(path, handler)

    http.Handle("/sse", &connectsse.Server{
        Handler: mux,
    })

    http.ListenAndServe(":8080", nil)
}
Client Example

Connect to an SSE-based Connect RPC service:

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "net/url"

    "connectrpc.com/connect"
    "github.com/firetiger-oss/connect-sse"

    "example.com/gen/greet/v1"
    "example.com/gen/greet/v1/greetv1connect"
)

func main() {
    transport := &connectsse.Client{
        URL: &url.URL{Path: "/sse"},
    }

    client := greetv1connect.NewGreetServiceClient(
        &http.Client{Transport: transport},
        "http://localhost:8080",
        connect.WithCodec(connect.JSONCodec{}), // required
    )

    ctx := context.Background()
    resp, err := client.Greet(ctx, connect.NewRequest(&greetv1.GreetRequest{
        Name: "World",
    }))
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(resp.Msg.Greeting)

    stream, err := client.GreetStream(ctx, connect.NewRequest(&greetv1.GreetRequest{
        Name: "Stream",
    }))
    if err != nil {
        log.Fatal(err)
    }

    for stream.Receive() {
        fmt.Println(stream.Msg().Greeting)
    }
    if err := stream.Err(); err != nil {
        log.Fatal(err)
    }
}
How It Works
Request Flow

The connect-sse package translates between Connect RPC's protocol and a nested HTTP+JSON/SSE format:

Client → Server:

  1. Connect RPC request is wrapped in a JSON envelope containing method, URI, headers, and body
  2. Sent as an HTTP POST to the SSE endpoint
  3. Server unwraps the envelope and forwards to the Connect RPC handler

Server → Client:

  • Unary responses: Returned as JSON directly
  • Streaming responses: Connect RPC's enveloped messages are converted to SSE events, then converted back to envelopes by the client
Protocol Translation

The package operates at the net/http layer:

  • Client implements http.RoundTripper to intercept and translate requests/responses
  • Server implements http.Handler to unwrap incoming requests and wrap outgoing responses

This design allows the package to work transparently with existing Connect RPC code without modifications to your service implementation.

License

Apache 2.0

Documentation

Overview

Package connectsse provides an HTTP transport layer for Connect RPC that uses Server-Sent Events (SSE) for streaming responses.

This package implements both client and server components that translate between the Connect RPC protocol and an HTTP+JSON/SSE transport layer. The translation happens at the net/http layer:

  • Client.Do implements connect.HTTPClient and converts Connect RPC requests to nested HTTP+JSON requests, then parses JSON or SSE responses back to the Connect RPC format.

  • Server.ServeHTTP implements http.Handler and does the reverse transformation, decoding nested requests and encoding responses as JSON or SSE.

For unary RPCs, requests and responses use application/json. For streaming RPCs, responses use Server-Sent Events (text/event-stream) on the wire, but are converted to the Connect RPC streaming format (application/connect+json with enveloped messages).

Example (Streaming)
package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"net/http/httptest"
	"net/url"

	"connectrpc.com/connect"
	connectsse "github.com/firetiger-oss/connect-sse"

	greetv1 "github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1"
	"github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1/greetv1connect"
)

// greetServer implements the GreetService.
type greetServer struct {
	greetv1connect.UnimplementedGreetServiceHandler
}

func (s *greetServer) Greet(
	ctx context.Context,
	req *connect.Request[greetv1.GreetRequest],
) (*connect.Response[greetv1.GreetResponse], error) {
	return connect.NewResponse(&greetv1.GreetResponse{
		Greeting: fmt.Sprintf("Hello, %s!", req.Msg.Name),
	}), nil
}

func (s *greetServer) GreetStream(
	ctx context.Context,
	req *connect.Request[greetv1.GreetRequest],
	stream *connect.ServerStream[greetv1.GreetResponse],
) error {
	for i := range 3 {
		if err := stream.Send(&greetv1.GreetResponse{
			Greeting: fmt.Sprintf("Hello, %s! (message %d)", req.Msg.Name, i+1),
		}); err != nil {
			return err
		}
	}
	return nil
}

func main() {
	// Create a Connect RPC handler
	mux := http.NewServeMux()
	path, handler := greetv1connect.NewGreetServiceHandler(
		&greetServer{},
		connect.WithCompression("gzip", nil, nil), // Disable gzip compression
	)
	mux.Handle(path, handler)

	// Wrap with SSE server
	server := httptest.NewServer(&connectsse.Server{
		Handler: mux,
	})
	defer server.Close()

	// Create Connect RPC client with SSE transport
	serverURL, _ := url.Parse(server.URL)
	client := greetv1connect.NewGreetServiceClient(
		&connectsse.Client{},
		serverURL.String(),
		connect.WithProtoJSON(),
	)

	// Make streaming call
	ctx := context.Background()
	stream, err := client.GreetStream(ctx, connect.NewRequest(&greetv1.GreetRequest{
		Name: "Streaming",
	}))
	if err != nil {
		log.Fatal(err)
	}

	// Receive streamed messages
	for stream.Receive() {
		fmt.Println(stream.Msg().Greeting)
	}
	if err := stream.Err(); err != nil {
		log.Fatal(err)
	}

}
Output:
Hello, Streaming! (message 1)
Hello, Streaming! (message 2)
Hello, Streaming! (message 3)
Example (Unary)
package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"net/http/httptest"
	"net/url"

	"connectrpc.com/connect"
	connectsse "github.com/firetiger-oss/connect-sse"

	greetv1 "github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1"
	"github.com/firetiger-oss/connect-sse/proto/go/example/greet/v1/greetv1connect"
)

// greetServer implements the GreetService.
type greetServer struct {
	greetv1connect.UnimplementedGreetServiceHandler
}

func (s *greetServer) Greet(
	ctx context.Context,
	req *connect.Request[greetv1.GreetRequest],
) (*connect.Response[greetv1.GreetResponse], error) {
	return connect.NewResponse(&greetv1.GreetResponse{
		Greeting: fmt.Sprintf("Hello, %s!", req.Msg.Name),
	}), nil
}

func (s *greetServer) GreetStream(
	ctx context.Context,
	req *connect.Request[greetv1.GreetRequest],
	stream *connect.ServerStream[greetv1.GreetResponse],
) error {
	for i := range 3 {
		if err := stream.Send(&greetv1.GreetResponse{
			Greeting: fmt.Sprintf("Hello, %s! (message %d)", req.Msg.Name, i+1),
		}); err != nil {
			return err
		}
	}
	return nil
}

func main() {
	// Create a Connect RPC handler
	mux := http.NewServeMux()
	path, handler := greetv1connect.NewGreetServiceHandler(
		&greetServer{},
		connect.WithCompression("gzip", nil, nil), // Disable gzip compression
	)
	mux.Handle(path, handler)

	// Wrap with SSE server
	server := httptest.NewServer(&connectsse.Server{
		Handler: mux,
	})
	defer server.Close()

	// Create Connect RPC client with SSE transport
	serverURL, _ := url.Parse(server.URL)
	client := greetv1connect.NewGreetServiceClient(
		&connectsse.Client{},
		serverURL.String(),
		connect.WithProtoJSON(),
	)

	// Make unary call
	ctx := context.Background()
	resp, err := client.Greet(ctx, connect.NewRequest(&greetv1.GreetRequest{
		Name: "World",
	}))
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(resp.Msg.Greeting)
}
Output:
Hello, World!

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// URL is the address of the SSE gateway. Any fields set on URL override
	// the corresponding fields of the outgoing request URL; unset fields fall
	// back to the values from the request. If nil, the request URL is used
	// unchanged, which is appropriate when the Connect RPC client and the SSE
	// gateway share the same base URL.
	URL *url.URL
	connect.HTTPClient
}

Client implements connect.HTTPClient and translates Connect RPC requests to HTTP+JSON requests with nested request structure, and translates JSON or SSE responses back to the Connect RPC format.

func (*Client) Do

func (c *Client) Do(req *http.Request) (*http.Response, error)

Do implements connect.HTTPClient.

type Request

type Request struct {
	Procedure string          `json:"procedure"`
	Header    http.Header     `json:"header,omitempty"`
	Message   json.RawMessage `json:"message,omitempty"`
}

Request represents a nested Connect RPC request that will be encoded in the body of the outer HTTP POST request sent by the Client.

type Server

type Server struct {
	Handler http.Handler
}

Server implements http.Handler and translates HTTP+JSON requests with nested request structure to Connect RPC requests, and translates Connect RPC responses back to JSON or SSE format.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

Directories

Path Synopsis
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL