connect

package module
v0.0.0-...-c292db9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2022 License: Apache-2.0 Imports: 25 Imported by: 0

README

connect

Build Report Card GoDoc

Connect is a small framework for building HTTP APIs. You write a short API definition file and implement your application logic, and connect generates code to handle marshaling, routing, error handling, and content-type negotiation. It also generates an idiomatic, type-safe client.

Connect is wire-compatible with the gRPC protocol, including streaming. Connect servers interoperate seamlessly with generated clients in more than a dozen languages, command-line tools like grpcurl, and proxies like Envoy and gRPC-Gateway. They also support gRPC-Web natively, so they can serve browser traffic without a translating proxy. Connect clients work with any gRPC or gRPC-Web server.

Under the hood, connect is just protocol buffers and the standard library: no custom HTTP implementation, no new name resolution or load balancing APIs, and no surprises. Everything you already know about net/http still applies, and any package that works with an http.Server, http.Client, or http.Handler also works with connect.

For more on connect, including a walkthrough and a comparison to alternatives, see the docs.

A Small Example

Curious what all this looks like in practice? From a protobuf schema, we generate a small RPC package. Using that package, we can build a server:

package main

import (
  "log"
  "net/http"

  "github.com/bufbuild/connect"
  "github.com/bufbuild/connect/internal/gen/connect/connect/ping/v1/pingv1connect"
  pingv1 "github.com/bufbuild/connect/internal/gen/go/connect/ping/v1"
)

type PingServer struct {
  pingv1connect.UnimplementedPingServiceHandler // returns errors from all methods
}

func (ps *PingServer) Ping(
  ctx context.Context,
  req *connect.Request[pingv1.PingRequest]) (*connect.Response[pingv1.PingResponse], error) {
  // connect.Request and connect.Response give you direct access to headers and
  // trailers. No context-based nonsense!
  log.Println(req.Header().Get("Some-Header"))
  res := connect.NewResponse(&pingv1.PingResponse{
    // req.Msg is a strongly-typed *pingv1.PingRequest, so we can access its
    // fields without type assertions.
    Number: req.Msg.Number,
  })
  res.Header().Set("Some-Other-Header", "hello!")
  res.Trailer().Set("Some-Trailer", "goodbye!")
  return res, nil
}

func main() {
  mux := http.NewServeMux()
  // The generated constructors return a path and a plain net/http
  // handler.
  mux.Handle(pingv1.NewPingServiceHandler(&PingServer{}))
  http.ListenAndServeTLS(":8081", "server.crt", "server.key", mux)
}

With that server running, you can make requests with any gRPC client. Using connect,

package main

import (
  "log"
  "net/http"

  "github.com/bufbuild/connect"
  "github.com/bufbuild/connect/internal/gen/connect/connect/ping/v1/pingv1connect"
  pingv1 "github.com/bufbuild/connect/internal/gen/go/connect/ping/v1"
)

func main() {
  client, err := pingv1connect.NewPingServiceClient(
    http.DefaultClient,
    "https://localhost:8081/",
  )
  if err != nil {
    log.Fatalln(err)
  }
  req := connect.NewRequest(&pingv1.PingRequest{
    Number: 42,
  })
  req.Header().Set("Some-Header", "hello from connect")
  res, err := client.Ping(context.Background(), req)
  if err != nil {
    log.Fatalln(err)
  }
  log.Println(res.Msg)
  log.Println(res.Header().Get("Some-Other-Header"))
  log.Println(res.Trailer().Get("Some-Trailer"))
}

You can find production-ready examples of servers and clients in the API documentation.

Status

Connect is in beta: we use it internally, but expect the Go community to discover new patterns for working with generics in the coming months. We plan to tag a release candidate in July 2022 and stable v1 soon after the Go 1.19 release.

Support and Versioning

Connect supports:

Within those parameters, connect follows semantic versioning. We make no exceptions for deprecated or experimental APIs.

Offered under the Apache 2 license.

Documentation

Overview

Package connect is an RPC framework built on protocol buffers and net/http. It's wire-compatible with gRPC and gRPC-Web, including support for streaming.

This documentation is intended to explain each type and function in isolation. For walkthroughs, comparisons to grpc-go, and other narrative docs, see https://bufconnect.com.

Example (Client)
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */)
// Timeouts, connection pooling, custom dialers, and other low-level
// transport details are handled by net/http. Everything you already know
// (or everything you learn) about hardening net/http Clients applies to
// connect too.
//
// Of course, you can skip this configuration and use http.DefaultClient for
// quick proof-of-concept code.
httpClient := &http.Client{
	Timeout: 5 * time.Second,
	Transport: &http.Transport{
		Proxy: nil,
		// connect handles compression on a per-message basis, so it's a waste to
		// compress the whole response body.
		DisableCompression: true,
		MaxIdleConns:       128,
		// RPC clients tend to make many requests to few hosts, so allow more
		// idle connections per host.
		MaxIdleConnsPerHost:    16,
		IdleConnTimeout:        90 * time.Second,
		MaxResponseHeaderBytes: 8 * 1024, // 8 KiB, gRPC's recommended setting
	},
	CheckRedirect: func(_ *http.Request, _ []*http.Request) error {
		// Don't follow any redirects.
		return http.ErrUseLastResponse
	},
}
// Unfortunately, pkg.go.dev can't run examples that actually use the
// network. To keep this example runnable, we'll use an HTTP server and
// client that communicate over in-memory pipes. Don't do this in production!
httpClient = examplePingServer.Client()

client, err := pingv1connect.NewPingServiceClient(
	httpClient,
	examplePingServer.URL(),
	connect.WithGRPC(),
)
if err != nil {
	logger.Println("error:", err)
	return
}
res, err := client.Ping(
	context.Background(),
	connect.NewRequest(&pingv1.PingRequest{Number: 42}),
)
if err != nil {
	logger.Println("error:", err)
	return
}
logger.Println("response content-type:", res.Header().Get("Content-Type"))
logger.Println("response message:", res.Msg)
Output:

response content-type: application/grpc+proto
response message: number:42
Example (Handler)
package main

import (
	"context"
	"net/http"
	"time"

	"github.com/bufbuild/connect"
	"github.com/bufbuild/connect/internal/gen/connect/connect/ping/v1/pingv1connect"
	pingv1 "github.com/bufbuild/connect/internal/gen/go/connect/ping/v1"
)

// ExamplePingServer implements some trivial business logic. The protobuf
// definition for this API is in proto/connect/ping/v1/ping.proto.
type ExamplePingServer struct {
	pingv1connect.UnimplementedPingServiceHandler
}

// Ping implements pingv1connect.PingServiceHandler.
func (*ExamplePingServer) Ping(
	_ context.Context,
	req *connect.Request[pingv1.PingRequest],
) (*connect.Response[pingv1.PingResponse], error) {
	return connect.NewResponse(&pingv1.PingResponse{
		Number: req.Msg.Number,
		Text:   req.Msg.Text,
	}), nil
}

func main() {
	// The business logic here is trivial, but the rest of the example is meant
	// to be somewhat realistic. This server has basic timeouts configured, and
	// it also exposes gRPC's server reflection and health check APIs.

	// protoc-gen-connect-go generates constructors that return plain net/http
	// Handlers, so they're compatible with most Go HTTP routers and middleware
	// (for example, net/http's StripPrefix).
	mux := http.NewServeMux()
	mux.Handle(pingv1connect.NewPingServiceHandler(
		&ExamplePingServer{},                // our business logic
		connect.WithReadMaxBytes(1024*1024), // limit request size
	))
	// You can serve gRPC's health and server reflection APIs using
	// github.com/bufbuild/connect-ecosystem-go.

	// Timeouts, connection handling, TLS configuration, and other low-level
	// transport details are handled by net/http. Everything you already know (or
	// anything you learn) about hardening net/http Servers applies to connect
	// too. Keep in mind that any timeouts you set will also apply to streaming
	// RPCs!
	//
	// If you're not familiar with the many timeouts exposed by net/http, start with
	// https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/.
	srv := &http.Server{
		Addr:           ":http",
		Handler:        mux,
		ReadTimeout:    2500 * time.Millisecond,
		WriteTimeout:   5 * time.Second,
		MaxHeaderBytes: 8 * 1024, // 8KiB, gRPC's recommendation
	}
	// You could also use golang.org/x/net/http2/h2c to serve gRPC requests
	// without TLS.
	srv.ListenAndServeTLS("testdata/server.crt", "testdata/server.key")
}

Index

Examples

Constants

View Source
const (
	StreamTypeUnary  StreamType = 0b00
	StreamTypeClient            = 0b01
	StreamTypeServer            = 0b10
	StreamTypeBidi              = StreamTypeClient | StreamTypeServer
)
View Source
const IsAtLeastVersion0_0_1 = true

These constants are used in compile-time handshakes with connect's generated code.

View Source
const Version = "0.0.1"

Version is the semantic version of the connect module.

Variables

This section is empty.

Functions

func DecodeBinaryHeader

func DecodeBinaryHeader(data string) ([]byte, error)

DecodeBinaryHeader base64-decodes the data. It can decode padded or unpadded values.

Binary headers sent by Google's gRPC implementations always have keys ending in "-Bin".

func EncodeBinaryHeader

func EncodeBinaryHeader(data []byte) string

EncodeBinaryHeader base64-encodes the data. It always emits unpadded values.

For interoperability with Google's gRPC implementations, binary headers should have keys ending in "-Bin".

Types

type AnyRequest

type AnyRequest interface {
	Any() any
	Spec() Specification
	Header() http.Header
	// contains filtered or unexported methods
}

AnyRequest is the common method set of all Requests, regardless of type parameter. It's used in unary interceptors.

To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyRequest.

type AnyResponse

type AnyResponse interface {
	Any() any
	Header() http.Header
	Trailer() http.Header
	// contains filtered or unexported methods
}

AnyResponse is the common method set of all Responses, regardless of type parameter. It's used in unary interceptors.

To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyRequest.

type BidiStream

type BidiStream[Req, Res any] struct {
	// contains filtered or unexported fields
}

BidiStream is the handler's view of a bidirectional streaming RPC.

func NewBidiStream

func NewBidiStream[Req, Res any](s Sender, r Receiver) *BidiStream[Req, Res]

NewBidiStream constructs the handler's view of a bidirectional streaming RPC.

func (*BidiStream[Req, Res]) Receive

func (b *BidiStream[Req, Res]) Receive() (*Req, error)

Receive a message. When the client is done sending messages, Receive will return an error that wraps io.EOF.

func (*BidiStream[Req, Res]) RequestHeader

func (b *BidiStream[Req, Res]) RequestHeader() http.Header

RequestHeader returns the headers received from the client.

func (*BidiStream[Req, Res]) ResponseHeader

func (b *BidiStream[Req, Res]) ResponseHeader() http.Header

ResponseHeader returns the response headers. Headers are sent with the first call to Send.

func (*BidiStream[Req, Res]) ResponseTrailer

func (b *BidiStream[Req, Res]) ResponseTrailer() http.Header

ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.

func (*BidiStream[Req, Res]) Send

func (b *BidiStream[Req, Res]) Send(msg *Res) error

Send a message to the client. The first call to Send also sends the response headers.

type BidiStreamForClient

type BidiStreamForClient[Req, Res any] struct {
	// contains filtered or unexported fields
}

BidiStreamForClient is the client's view of a bidirectional streaming RPC.

func NewBidiStreamForClient

func NewBidiStreamForClient[Req, Res any](s Sender, r Receiver) *BidiStreamForClient[Req, Res]

NewBidiStreamForClient constructs the client's view of a bidirectional streaming RPC.

func (*BidiStreamForClient[Req, Res]) CloseReceive

func (b *BidiStreamForClient[Req, Res]) CloseReceive() error

CloseReceive closes the receive side of the stream.

func (*BidiStreamForClient[Req, Res]) CloseSend

func (b *BidiStreamForClient[Req, Res]) CloseSend() error

CloseSend closes the send side of the stream.

func (*BidiStreamForClient[Req, Res]) Receive

func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error)

Receive a message. When the server is done sending messages and no other errors have occurred, Receive will return an error that wraps io.EOF.

func (*BidiStreamForClient[Req, Res]) RequestHeader

func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header

RequestHeader returns the request headers. Headers are sent with the first call to Send.

func (*BidiStreamForClient[Req, Res]) ResponseHeader

func (b *BidiStreamForClient[Req, Res]) ResponseHeader() http.Header

ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.

func (*BidiStreamForClient[Req, Res]) ResponseTrailer

func (b *BidiStreamForClient[Req, Res]) ResponseTrailer() http.Header

ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.

func (*BidiStreamForClient[Req, Res]) Send

func (b *BidiStreamForClient[Req, Res]) Send(msg *Req) error

Send a message to the server. The first call to Send also sends the request headers.

If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for case using the standard library's errors.Is and unmarshal the error using Receive.

type Client

type Client[Req, Res any] struct {
	// contains filtered or unexported fields
}

Client is a reusable, concurrency-safe client for a single procedure. Depending on the procedure's type, use the CallUnary, CallClientStream, CallServerStream, or CallBidiStream method.

By default, clients use the binary protobuf Codec, ask for gzipped responses, and send uncompressed requests. They don't have a default protocol; callers of NewClient or generated client constructors must explicitly choose a protocol with either the WithGRPC or WithGRPCWeb options.

func NewClient

func NewClient[Req, Res any](
	httpClient HTTPClient,
	url string,
	options ...ClientOption,
) (*Client[Req, Res], error)

NewClient constructs a new Client.

func (*Client[Req, Res]) CallBidiStream

func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res]

CallBidiStream calls a bidirectional streaming procedure.

func (*Client[Req, Res]) CallClientStream

func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res]

CallClientStream calls a client streaming procedure.

func (*Client[Req, Res]) CallServerStream

func (c *Client[Req, Res]) CallServerStream(
	ctx context.Context,
	req *Request[Req],
) (*ServerStreamForClient[Res], error)

CallServerStream calls a server streaming procedure.

func (*Client[Req, Res]) CallUnary

func (c *Client[Req, Res]) CallUnary(
	ctx context.Context,
	req *Request[Req],
) (*Response[Res], error)

CallUnary calls a request-response procedure.

type ClientOption

type ClientOption interface {
	// contains filtered or unexported methods
}

A ClientOption configures a connect client.

In addition to any options grouped in the documentation below, remember that Options are also valid ClientOptions.

func WithClientOptions

func WithClientOptions(options ...ClientOption) ClientOption

WithClientOptions composes multiple ClientOptions into one.

func WithGRPC

func WithGRPC() ClientOption

WithGRPC configures clients to use the HTTP/2 gRPC protocol.

func WithGRPCWeb

func WithGRPCWeb() ClientOption

WithGRPCWeb configures clients to use the gRPC-Web protocol.

func WithGzipRequests

func WithGzipRequests() ClientOption

WithGzipRequests configures the client to gzip requests. It requires that the client already have a registered gzip compressor (via either WithGzip or WithCompressor).

Because some servers don't support gzip, clients default to sending uncompressed requests.

func WithRequestCompression

func WithRequestCompression(name string) ClientOption

WithRequestCompression configures the client to use the specified algorithm to compress request messages. If the algorithm has not been registered using WithCompression, the generated client constructor will return an error.

Because some servers don't support compression, clients default to sending uncompressed requests.

type ClientStream

type ClientStream[Req, Res any] struct {
	// contains filtered or unexported fields
}

ClientStream is the handler's view of a client streaming RPC.

func NewClientStream

func NewClientStream[Req, Res any](s Sender, r Receiver) *ClientStream[Req, Res]

NewClientStream constructs the handler's view of a client streaming RPC.

func (*ClientStream[Req, Res]) Err

func (c *ClientStream[Req, Res]) Err() error

Err returns the first non-EOF error that was encountered by Receive.

func (*ClientStream[Req, Res]) Msg

func (c *ClientStream[Req, Res]) Msg() *Req

Msg returns the most recent message unmarshaled by a call to Receive. The returned message points to data that will be overwritten by the next call to Receive.

func (*ClientStream[Req, Res]) Receive

func (c *ClientStream[Req, Res]) Receive() bool

Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.

func (*ClientStream[Req, Res]) RequestHeader

func (c *ClientStream[Req, Res]) RequestHeader() http.Header

RequestHeader returns the headers received from the client.

func (*ClientStream[Req, Res]) SendAndClose

func (c *ClientStream[Req, Res]) SendAndClose(envelope *Response[Res]) error

SendAndClose closes the receive side of the stream, then sends a response back to the client.

type ClientStreamForClient

type ClientStreamForClient[Req, Res any] struct {
	// contains filtered or unexported fields
}

ClientStreamForClient is the client's view of a client streaming RPC.

func NewClientStreamForClient

func NewClientStreamForClient[Req, Res any](s Sender, r Receiver) *ClientStreamForClient[Req, Res]

NewClientStreamForClient constructs the client's view of a client streaming RPC.

func (*ClientStreamForClient[Req, Res]) CloseAndReceive

func (c *ClientStreamForClient[Req, Res]) CloseAndReceive() (*Response[Res], error)

CloseAndReceive closes the send side of the stream and waits for the response.

func (*ClientStreamForClient[Req, Res]) RequestHeader

func (c *ClientStreamForClient[Req, Res]) RequestHeader() http.Header

RequestHeader returns the request headers. Headers are sent to the server with the first call to Send.

func (*ClientStreamForClient[Req, Res]) Send

func (c *ClientStreamForClient[Req, Res]) Send(msg *Req) error

Send a message to the server. The first call to Send also sends the request headers.

If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for case using the standard library's errors.Is and unmarshal the error using CloseAndReceive.

type Code

type Code uint32

A Code is one of gRPC's canonical status codes. There are no user-defined codes, so only the codes enumerated below are valid.

See the specification at https://github.com/grpc/grpc/blob/master/doc/statuscodes.md for detailed descriptions of each code and example usage.

const (

	// CodeCanceled indicates that the operation was canceled, typically by the
	// caller.
	//
	// Note that connect follows the gRPC specification (and some, but not all,
	// implementations) and uses the British "CANCELLED" as CodeCanceled's string
	// representation rather than the American "CANCELED".
	CodeCanceled Code = 1

	// CodeUnknown indicates that the operation failed for an unknown reason.
	CodeUnknown Code = 2

	// CodeInvalidArgument indicates that client supplied an invalid argument.
	//
	// Note that this differs from CodeFailedPrecondition: CodeInvalidArgument
	// indicates that the argument(s) are problematic regardless of the state of
	// the system (for example, an invalid URL).
	CodeInvalidArgument Code = 3

	// CodeDeadlineExceeded indicates that deadline expired before the operation
	// could complete. For operations that change the state of the system, this
	// error may be returned even if the operation has completed successfully
	// (but late).
	CodeDeadlineExceeded Code = 4

	// CodeNotFound indicates that some requested entity (for example, a file or
	// directory) was not found.
	//
	// If an operation is denied for an entire class of users, such as gradual
	// feature rollout or an undocumented allowlist, CodeNotFound may be used. If
	// a request is denied for some users within a class of users, such as
	// user-based access control, CodePermissionDenied must be used.
	CodeNotFound Code = 5

	// CodeAlreadyExists indicates that client attempted to create an entity (for
	// example, a file or directory) that already exists.
	CodeAlreadyExists Code = 6

	// CodePermissionDenied indicates that the caller does'nt have permission to
	// execute the specified operation.
	//
	// CodePermissionDenied must not be used for rejections caused by exhausting
	// some resource (use CodeResourceExhausted instead). CodePermissionDenied
	// must not be used if the caller can't be identified (use
	// CodeUnauthenticated instead). This error code doesn't imply that the
	// request is valid, the requested entity exists, or other preconditions are
	// satisfied.
	CodePermissionDenied Code = 7

	// CodeResourceExhausted indicates that some resource has been exhausted. For
	// example, a per-user quota may be exhausted or the entire file system may
	// be full.
	CodeResourceExhausted Code = 8

	// CodeFailedPrecondition indicates that the system is not in a state
	// required for the operation's execution.
	//
	// Service implementors can use the following guidelines to decide between
	// CodeFailedPrecondition, CodeAborted, and CodeUnavailable:
	//
	//   - Use CodeUnavailable if the client can retry just the failing call.
	//   - Use CodeAborted if the client should retry at a higher level. For
	//   example, if a client-specified test-and-set fails, the client should
	//   restart the whole read-modify-write sequence.
	//   - Use CodeFailedPrecondition if the client should not retry until the
	//   system state has been explicitly fixed. For example, a deleting a
	//   directory on the filesystem might return CodeFailedPrecondition if the
	//   directory still contains files, since the client should not retry unless
	//   they first delete the offending files.
	CodeFailedPrecondition Code = 9

	// CodeAborted indicates that operation was aborted by the system, usually
	// because of a concurrency issue such as a sequencer check failure or
	// transaction abort.
	//
	// The documentation for CodeFailedPrecondition includes guidelines for
	// choosing between CodeFailedPrecondition, CodeAborted, and CodeUnavailable.
	CodeAborted Code = 10

	// CodeOutOfRange indicates that the operation was attempted past the valid
	// range (for example, seeking past end-of-file).
	//
	// Unlike CodeInvalidArgument, this error indicates a problem that may be
	// fixed if the system state changes. For example, a 32-bit file system will
	// generate CodeInvalidArgument if asked to read at an offset that is not in
	// the range [0,2^32), but it will generate CodeOutOfRange if asked to read
	// from an offset past the current file size.
	//
	// CodeOutOfRange naturally overlaps with CodeFailedPrecondition. Where
	// possible, use the more specific CodeOutOfRange so that callers who are
	// iterating through a space can easily detect when they're done.
	CodeOutOfRange Code = 11

	// CodeUnimplemented indicates that the operation isn't implemented,
	// supported, or enabled in this service.
	CodeUnimplemented Code = 12

	// CodeInternal indicates that some invariants expected by the underlying
	// system have been broken. This code is reserved for serious errors.
	CodeInternal Code = 13

	// CodeUnavailable indicates that the service is currently unavailable. This
	// is usually temporary, so clients can back off and retry idempotent
	// operations.
	CodeUnavailable Code = 14

	// CodeDataLoss indicates that the operation has resulted in unrecoverable
	// data loss or corruption.
	CodeDataLoss Code = 15

	// CodeUnauthenticated indicates that the request does not have valid
	// authentication credentials for the operation.
	CodeUnauthenticated Code = 16
)

func CodeOf

func CodeOf(err error) Code

CodeOf returns the error's status code if it is or wraps a *connect.Error and CodeUnknown otherwise.

func (Code) String

func (c Code) String() string

type Codec

type Codec interface {
	// Name returns the name of the Codec.
	//
	// This may be used as part of the Content-Type within HTTP. For example,
	// with gRPC this is the content subtype, that is "application/grpc+proto"
	// will map to the Codec with name "proto".
	//
	// Names are expected to not be empty.
	Name() string
	// Marshal marshals the given message.
	//
	// Marshal may expect a specific type of message, and will error if this type is not given.
	Marshal(any) ([]byte, error)
	// Marshal unmarshals the given message.
	//
	// Unmarshal may expect a specific type of message, and will error if this type is not given.
	Unmarshal([]byte, any) error
}

Codec marshals structs (typically generated from a schema) to and from bytes.

type Compressor

type Compressor interface {
	io.Writer

	// Close flushes any buffered data to the underlying sink, then closes the
	// Compressor. It must not close the underlying sink.
	Close() error

	// Reset discards the Compressor's internal state, if any, and prepares it to
	// write compressed data to a new sink.
	Reset(io.Writer)
}

A Compressor is a reusable wrapper that compresses data written to an underlying sink. The standard library's *gzip.Writer implements Compressor.

type Decompressor

type Decompressor interface {
	io.Reader

	// Close closes the Decompressor, but not the underlying data source. It may
	// return an error if the Decompressor wasn't read to EOF.
	Close() error

	// Reset discards the Decompressor's internal state, if any, and prepares it
	// to read from a new source of compressed data.
	Reset(io.Reader) error
}

A Decompressor is a reusable wrapper that decompresses an underlying data source. The standard library's *gzip.Reader implements Decompressor.

type Error

type Error struct {
	// contains filtered or unexported fields
}

An Error captures three key pieces of information: a Code, an underlying Go error, and an optional collection of arbitrary protobuf messages called "details" (more on those below). Servers send the code, the underlying error's Error() output, and details over the wire to clients. Remember that the underlying error's message will be sent to clients - take care not to leak sensitive information from public APIs!

Service implementations and interceptors should return errors that can be cast to an *Error (using the standard library's errors.As). If the returned error can't be cast to an *Error, connect will use CodeUnknown and the returned error's message.

Error details were introduced before gRPC adopted a formal proposal process, so they're not clearly documented anywhere and may differ slightly between implementations. Roughly, they're an optional mechanism for servers, middleware, and proxies to attach arbitrary protobuf messages to the error code and message.

func NewError

func NewError(c Code, underlying error) *Error

NewError annotates any Go error with a status code.

func (*Error) AddDetail

func (e *Error) AddDetail(d ErrorDetail)

AddDetail appends a message to the error's details.

func (*Error) Code

func (e *Error) Code() Code

Code returns the error's status code.

func (*Error) Details

func (e *Error) Details() []ErrorDetail

Details returns the error's details.

func (*Error) Error

func (e *Error) Error() string

func (*Error) Meta

func (e *Error) Meta() http.Header

Meta allows the error to carry additional information as key-value pairs.

Metadata written by handlers may be sent as HTTP headers, HTTP trailers, or a block of in-body metadata, depending on the protocol in use and whether or not the handler has already written messages to the stream.

When clients receive errors, the metadata contains the union of the HTTP headers, HTTP trailers, and in-body metadata, if any.

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap implements errors.Wrapper, which allows errors.Is and errors.As access to the underlying error.

type ErrorDetail

type ErrorDetail interface {
	proto.Message

	MessageName() protoreflect.FullName
	UnmarshalTo(proto.Message) error
}

An ErrorDetail is a self-describing protobuf message attached to an *Error. Error details are sent over the network to clients, which can then work with strongly-typed data rather than trying to parse a complex error message.

The ErrorDetail interface is implemented by protobuf's Any type, provided in Go by the google.golang.org/protobuf/types/known/anypb package. The google.golang.org/genproto/googleapis/rpc/errdetails package contains a variety of protobuf messages commonly wrapped in anypb.Any and used as error details.

type HTTPClient

type HTTPClient interface {
	Do(*http.Request) (*http.Response, error)
}

HTTPClient is the transport-level interface connect expects HTTP clients to implement. The standard library's http.Client implements HTTPClient.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

A Handler is the server-side implementation of a single RPC defined by a protocol buffer service.

By default, Handlers support the gRPC and gRPC-Web protocols with the binary protobuf and JSON codecs. They support gzip compression using the standard library's compress/gzip.

func NewBidiStreamHandler

func NewBidiStreamHandler[Req, Res any](
	procedure string,
	implementation func(context.Context, *BidiStream[Req, Res]) error,
	options ...HandlerOption,
) *Handler

NewBidiStreamHandler constructs a Handler for a bidirectional streaming procedure.

func NewClientStreamHandler

func NewClientStreamHandler[Req, Res any](
	procedure string,
	implementation func(context.Context, *ClientStream[Req, Res]) error,
	options ...HandlerOption,
) *Handler

NewClientStreamHandler constructs a Handler for a client streaming procedure.

func NewServerStreamHandler

func NewServerStreamHandler[Req, Res any](
	procedure string,
	implementation func(context.Context, *Request[Req], *ServerStream[Res]) error,
	options ...HandlerOption,
) *Handler

NewServerStreamHandler constructs a Handler for a server streaming procedure.

func NewUnaryHandler

func NewUnaryHandler[Req, Res any](
	procedure string,
	unary func(context.Context, *Request[Req]) (*Response[Res], error),
	options ...HandlerOption,
) *Handler

NewUnaryHandler constructs a Handler for a request-response procedure.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

type HandlerOption

type HandlerOption interface {
	// contains filtered or unexported methods
}

A HandlerOption configures a Handler.

In addition to any options grouped in the documentation below, remember that all Options are also HandlerOptions.

func WithHandlerOptions

func WithHandlerOptions(options ...HandlerOption) HandlerOption

WithHandlerOptions composes multiple HandlerOptions into one.

type Interceptor

type Interceptor interface {
	// WrapUnary adds logic to a unary procedure. The returned UnaryFunc must be safe
	// to call concurrently.
	WrapUnary(UnaryFunc) UnaryFunc

	// WrapStreamContext, WrapStreamSender, and WrapStreamReceiver work together
	// to add logic to streaming procedures. Stream interceptors work in phases.
	// First, each interceptor may wrap the request context. Then, the connect
	// runtime constructs a (Sender, Receiver) pair. Finally, each interceptor
	// may wrap the Sender and/or Receiver. For example, the flow within a
	// Handler looks like this:
	//
	//   func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	//     ctx := r.Context()
	//     if ic := h.interceptor; ic != nil {
	//       ctx = ic.WrapStreamContext(ctx)
	//     }
	//     sender, receiver := h.newStream(w, r.WithContext(ctx))
	//     if ic := h.interceptor; ic != nil {
	//       sender = ic.WrapStreamSender(ctx, sender)
	//       receiver = ic.WrapStreamReceiver(ctx, receiver)
	//     }
	//     h.serveStream(sender, receiver)
	//   }
	//
	// Sender and Receiver implementations don't need to be safe for concurrent
	// use.
	WrapStreamContext(context.Context) context.Context
	WrapStreamSender(context.Context, Sender) Sender
	WrapStreamReceiver(context.Context, Receiver) Receiver
}

An Interceptor adds logic to a generated handler or client, like the decorators or middleware you may have seen in other libraries. Interceptors may replace the context, mutate the request, mutate the response, handle the returned error, retry, recover from panics, emit logs and metrics, or do nearly anything else.

Example
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */)
loggingInterceptor := connect.UnaryInterceptorFunc(
	func(next connect.UnaryFunc) connect.UnaryFunc {
		return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			logger.Println("calling:", req.Spec().Procedure)
			logger.Println("request:", req.Any())
			res, err := next(ctx, req)
			if err != nil {
				logger.Println("error:", err)
			} else {
				logger.Println("response:", res.Any())
			}
			return res, err
		})
	},
)
client, err := pingv1connect.NewPingServiceClient(
	examplePingServer.Client(),
	examplePingServer.URL(),
	connect.WithGRPC(),
	connect.WithInterceptors(loggingInterceptor),
)
if err != nil {
	logger.Println("error:", err)
	return
}
client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{Number: 42}))
Output:

calling: /connect.ping.v1.PingService/Ping
request: number:42
response: number:42

type Option

type Option interface {
	ClientOption
	HandlerOption
}

Option implements both ClientOption and HandlerOption, so it can be applied both client-side and server-side.

func WithCodec

func WithCodec(codec Codec) Option

WithCodec registers a serialization method with a client or handler. Registering a codec with an empty name is a no-op.

Typically, generated code automatically supplies this option with the appropriate codec(s). For example, handlers generated from protobuf schemas using protoc-gen-connect-go automatically register binary and JSON codecs. Users with more specialized needs may override the default codecs by registering a new codec under the same name.

Handlers may have multiple codecs registered, and use whichever the client chooses. Clients may only have a single codec.

func WithCompressMinBytes

func WithCompressMinBytes(min int) Option

WithCompressMinBytes sets a minimum size threshold for compression: regardless of compressor configuration, messages smaller than the configured minimum are sent uncompressed.

The default minimum is zero. Setting a minimum compression threshold may improve overall performance, because the CPU cost of compressing very small messages usually isn't worth the small reduction in network I/O.

func WithCompression

func WithCompression[D Decompressor, C Compressor](
	name string,
	newDecompressor func() D,
	newCompressor func() C,
) Option

WithCompression configures client and server compression strategies. The Compressors and Decompressors produced by the supplied constructors must use the same algorithm.

For handlers, WithCompression registers a compression algorithm. Clients may send messages compressed with that algorithm and/or request compressed responses.

For clients, WithCompression serves two purposes. First, the client asks servers to compress responses using any of the registered algorithms. (gRPC's compression negotiation is complex, but most of Google's gRPC server implementations won't compress responses unless the request is compressed.) Second, it makes all the registered algorithms available for use with WithRequestCompression. Note that actually compressing requests requires using both WithCompression and WithRequestCompression.

Calling WithCompression with an empty name or nil constructors is a no-op.

func WithGzip

func WithGzip() Option

WithGzip registers a gzip compressor backed by the standard library's gzip package with the default compression level.

Handlers with this option applied accept gzipped requests and can send gzipped responses. Clients with this option applied request gzipped responses, but don't automatically send gzipped requests (since the server may not support them). Use WithGzipRequests to gzip requests.

Handlers and clients generated by protoc-gen-connect-go apply WithGzip by default.

func WithInterceptors

func WithInterceptors(interceptors ...Interceptor) Option

WithInterceptors configures a client or handler's interceptor stack. Repeated WithInterceptors options are applied in order, so

WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C)

Unary interceptors compose like an onion. The first interceptor provided is the outermost layer of the onion: it acts first on the context and request, and last on the response and error.

Stream interceptors also behave like an onion: the first interceptor provided is the first to wrap the context and is the outermost wrapper for the (Sender, Receiver) pair. It's the first to see sent messages and the last to see received messages.

Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces:

 client.Send()     client.Receive()
       |                 ^
       v                 |
    A ---               --- A
    B ---               --- B
      ...               ...
    Y ---               --- Y
    Z ---               --- Z
       |                 ^
       v                 |
    network            network
       |                 ^
       v                 |
    A ---               --- A
    B ---               --- B
      ...               ...
    Y ---               --- Y
    Z ---               --- Z
       |                 ^
       v                 |
handler.Receive() handler.Send()
       |                 ^
       |                 |
       -> handler logic --

Note that in clients, the Sender handles the request message(s) and the Receiver handles the response message(s). For handlers, it's the reverse. Depending on your interceptor's logic, you may need to wrap one side of the stream on the clients and the other side on handlers.

Example
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */)
outer := connect.UnaryInterceptorFunc(
	func(next connect.UnaryFunc) connect.UnaryFunc {
		return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			logger.Println("outer interceptor: before call")
			res, err := next(ctx, req)
			logger.Println("outer interceptor: after call")
			return res, err
		})
	},
)
inner := connect.UnaryInterceptorFunc(
	func(next connect.UnaryFunc) connect.UnaryFunc {
		return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) {
			logger.Println("inner interceptor: before call")
			res, err := next(ctx, req)
			logger.Println("inner interceptor: after call")
			return res, err
		})
	},
)
client, err := pingv1connect.NewPingServiceClient(
	examplePingServer.Client(),
	examplePingServer.URL(),
	connect.WithGRPC(),
	connect.WithInterceptors(outer, inner),
)
if err != nil {
	logger.Println("error:", err)
	return
}
client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{}))
Output:

outer interceptor: before call
inner interceptor: before call
inner interceptor: after call
outer interceptor: after call

func WithOptions

func WithOptions(options ...Option) Option

WithOptions composes multiple Options into one.

func WithProtoBinaryCodec

func WithProtoBinaryCodec() Option

WithProtoBinaryCodec registers a binary protocol buffer codec that uses google.golang.org/protobuf/proto.

Handlers and clients generated by protoc-gen-connect-go have WithProtoBinaryCodec applied by default. To replace the default binary protobuf codec (with vtprotobuf, for example), apply WithCodec with a Codec whose name is "proto".

func WithProtoJSONCodec

func WithProtoJSONCodec() Option

WithProtoJSONCodec registers a codec that serializes protocol buffer messages as JSON. It uses the standard protobuf JSON mapping as implemented by google.golang.org/protobuf/encoding/protojson: fields are named using lowerCamelCase, zero values are omitted, missing required fields are errors, enums are emitted as strings, etc.

Handlers generated by protoc-gen-connect-go have WithProtoJSONCodec applied by default.

func WithReadMaxBytes

func WithReadMaxBytes(n int64) Option

WithReadMaxBytes limits the performance impact of pathologically large messages sent by the other party. For handlers, WithReadMaxBytes limits the size of message that the client can send. For clients, WithReadMaxBytes limits the size of message that the server can respond with. Limits are applied before decompression and apply to each protobuf message, not to the stream as a whole.

Setting WithReadMaxBytes to zero allows any message size. Both clients and handlers default to allowing any request size.

type Receiver

type Receiver interface {
	Receive(any) error
	Close() error

	Spec() Specification
	Header() http.Header
	// Trailers are populated only after Receive returns an error wrapping
	// io.EOF.
	Trailer() http.Header
}

Receiver is the readable side of a bidirectional stream of messages. Receiver implementations do not need to be safe for concurrent use.

Receiver implementations provided by this module guarantee that all returned errors can be cast to *Error using errors.As.

type Request

type Request[T any] struct {
	Msg *T
	// contains filtered or unexported fields
}

Request is a wrapper around a generated request message. It provides access to metadata like headers and the RPC specification, as well as strongly-typed access to the message itself.

func NewRequest

func NewRequest[T any](message *T) *Request[T]

NewRequest wraps a generated request message.

func (*Request[_]) Any

func (r *Request[_]) Any() any

Any returns the concrete request message as an empty interface, so that *Request implements the AnyRequest interface.

func (*Request[_]) Header

func (r *Request[_]) Header() http.Header

Header returns the HTTP headers for this request.

func (*Request[_]) Spec

func (r *Request[_]) Spec() Specification

Spec returns the Specification for this RPC.

type Response

type Response[T any] struct {
	Msg *T
	// contains filtered or unexported fields
}

Response is a wrapper around a generated response message. It provides access to metadata like headers and trailers, as well as strongly-typed access to the message itself.

func NewResponse

func NewResponse[T any](message *T) *Response[T]

NewResponse wraps a generated response message.

func (*Response[_]) Any

func (r *Response[_]) Any() any

Any returns the concrete response message as an empty interface, so that *Response implements the AnyResponse interface.

func (*Response[_]) Header

func (r *Response[_]) Header() http.Header

Header returns the HTTP headers for this response.

func (*Response[_]) Trailer

func (r *Response[_]) Trailer() http.Header

Trailer returns the trailers for this response. Depending on the underlying RPC protocol, trailers may be sent as HTTP trailers or a protocol-specific block of in-body metadata.

type Sender

type Sender interface {
	Send(any) error
	Close(error) error

	Spec() Specification
	Header() http.Header
	Trailer() http.Header
}

Sender is the writable side of a bidirectional stream of messages. Sender implementations do not need to be safe for concurrent use.

Sender implementations provided by this module guarantee that all returned errors can be cast to *Error using errors.As. The Close method of Sender implementations provided by this module automatically adds the appropriate codes when passed context.DeadlineExceeded or context.Canceled.

Like the standard library's http.ResponseWriter, both client- and handler-side Senders write headers to the network with the first call to Send. Any subsequent mutations to the headers are effectively no-ops.

Handler-side Senders may mutate trailers until calling Close, when the trailers are written to the network. Clients should avoid sending trailers: usage is nuanced, protocol-specific, and will likely create incompatibilities with other gRPC implementations.

Once servers return an error, they're not interested in receiving additional messages and clients should stop sending them. Client-side Senders indicate this by returning a wrapped io.EOF from Send. Clients should check for this condition with the standard library's errors.Is and call the receiver's Receive method to unmarshal the error.

type ServerStream

type ServerStream[Res any] struct {
	// contains filtered or unexported fields
}

ServerStream is the handler's view of a server streaming RPC.

func NewServerStream

func NewServerStream[Res any](s Sender) *ServerStream[Res]

NewServerStream constructs the handler's view of a server streaming RPC.

func (*ServerStream[Res]) ResponseHeader

func (s *ServerStream[Res]) ResponseHeader() http.Header

ResponseHeader returns the response headers. Headers are sent with the first call to Send.

func (*ServerStream[Res]) ResponseTrailer

func (s *ServerStream[Res]) ResponseTrailer() http.Header

ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.

func (*ServerStream[Res]) Send

func (s *ServerStream[Res]) Send(msg *Res) error

Send a message to the client. The first call to Send also sends the response headers.

type ServerStreamForClient

type ServerStreamForClient[Res any] struct {
	// contains filtered or unexported fields
}

ServerStreamForClient is the client's view of a server streaming RPC.

func NewServerStreamForClient

func NewServerStreamForClient[Res any](r Receiver) *ServerStreamForClient[Res]

NewServerStreamForClient constructs the client's view of a server streaming RPC.

func (*ServerStreamForClient[Res]) Close

func (s *ServerStreamForClient[Res]) Close() error

Close the receive side of the stream.

func (*ServerStreamForClient[Res]) Err

func (s *ServerStreamForClient[Res]) Err() error

Err returns the first non-EOF error that was encountered by Receive.

func (*ServerStreamForClient[Res]) Msg

func (s *ServerStreamForClient[Res]) Msg() *Res

Msg returns the most recent message unmarshaled by a call to Receive. The returned message points to data that will be overwritten by the next call to Receive.

func (*ServerStreamForClient[Res]) Receive

func (s *ServerStreamForClient[Res]) Receive() bool

Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.

func (*ServerStreamForClient[Res]) ResponseHeader

func (s *ServerStreamForClient[Res]) ResponseHeader() http.Header

ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.

func (*ServerStreamForClient[Res]) ResponseTrailer

func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header

ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.

type Specification

type Specification struct {
	StreamType StreamType
	Procedure  string // e.g., "/acme.foo.v1.FooService/Bar"
	IsClient   bool   // otherwise we're in a handler
}

Specification is a description of a client call or a handler invocation.

type StreamType

type StreamType uint8

StreamType describes whether the client, server, neither, or both is streaming.

type UnaryFunc

type UnaryFunc func(context.Context, AnyRequest) (AnyResponse, error)

UnaryFunc is the generic signature of a unary RPC. Interceptors wrap Funcs.

The type of the request and response structs depend on the codec being used. When using protobuf, request.Any() and response.Any() will always be proto.Message implementations.

type UnaryInterceptorFunc

type UnaryInterceptorFunc func(UnaryFunc) UnaryFunc

UnaryInterceptorFunc is a simple Interceptor implementation that only wraps unary RPCs. It has no effect on client, server, or bidirectional streaming RPCs.

func (UnaryInterceptorFunc) WrapStreamContext

func (f UnaryInterceptorFunc) WrapStreamContext(ctx context.Context) context.Context

WrapStreamContext implements Interceptor with a no-op.

func (UnaryInterceptorFunc) WrapStreamReceiver

func (f UnaryInterceptorFunc) WrapStreamReceiver(_ context.Context, receiver Receiver) Receiver

WrapStreamReceiver implements Interceptor with a no-op.

func (UnaryInterceptorFunc) WrapStreamSender

func (f UnaryInterceptorFunc) WrapStreamSender(_ context.Context, sender Sender) Sender

WrapStreamSender implements Interceptor with a no-op.

func (UnaryInterceptorFunc) WrapUnary

func (f UnaryInterceptorFunc) WrapUnary(next UnaryFunc) UnaryFunc

WrapUnary implements Interceptor by applying the interceptor function.

Directories

Path Synopsis
cmd
protoc-gen-connect-go
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code.
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code.
internal
assert
Package assert is a minimal assert package using generics.
Package assert is a minimal assert package using generics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL