Documentation
¶
Overview ¶
Package connect is an RPC framework built on protocol buffers and net/http. It's wire-compatible with gRPC and gRPC-Web, including support for streaming.
This documentation is intended to explain each type and function in isolation. For walkthroughs, comparisons to grpc-go, and other narrative docs, see https://bufconnect.com.
Example (Client) ¶
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */) // Timeouts, connection pooling, custom dialers, and other low-level // transport details are handled by net/http. Everything you already know // (or everything you learn) about hardening net/http Clients applies to // connect too. // // Of course, you can skip this configuration and use http.DefaultClient for // quick proof-of-concept code. httpClient := &http.Client{ Timeout: 5 * time.Second, Transport: &http.Transport{ Proxy: nil, // connect handles compression on a per-message basis, so it's a waste to // compress the whole response body. DisableCompression: true, MaxIdleConns: 128, // RPC clients tend to make many requests to few hosts, so allow more // idle connections per host. MaxIdleConnsPerHost: 16, IdleConnTimeout: 90 * time.Second, MaxResponseHeaderBytes: 8 * 1024, // 8 KiB, gRPC's recommended setting }, CheckRedirect: func(_ *http.Request, _ []*http.Request) error { // Don't follow any redirects. return http.ErrUseLastResponse }, } // Unfortunately, pkg.go.dev can't run examples that actually use the // network. To keep this example runnable, we'll use an HTTP server and // client that communicate over in-memory pipes. Don't do this in production! httpClient = examplePingServer.Client() client, err := pingv1connect.NewPingServiceClient( httpClient, examplePingServer.URL(), connect.WithGRPC(), ) if err != nil { logger.Println("error:", err) return } res, err := client.Ping( context.Background(), connect.NewRequest(&pingv1.PingRequest{Number: 42}), ) if err != nil { logger.Println("error:", err) return } logger.Println("response content-type:", res.Header().Get("Content-Type")) logger.Println("response message:", res.Msg)
Output: response content-type: application/grpc+proto response message: number:42
Example (Handler) ¶
package main import ( "context" "net/http" "time" "github.com/bufbuild/connect" "github.com/bufbuild/connect/internal/gen/connect/connect/ping/v1/pingv1connect" pingv1 "github.com/bufbuild/connect/internal/gen/go/connect/ping/v1" ) // ExamplePingServer implements some trivial business logic. The protobuf // definition for this API is in proto/connect/ping/v1/ping.proto. type ExamplePingServer struct { pingv1connect.UnimplementedPingServiceHandler } // Ping implements pingv1connect.PingServiceHandler. func (*ExamplePingServer) Ping( _ context.Context, req *connect.Request[pingv1.PingRequest], ) (*connect.Response[pingv1.PingResponse], error) { return connect.NewResponse(&pingv1.PingResponse{ Number: req.Msg.Number, Text: req.Msg.Text, }), nil } func main() { // The business logic here is trivial, but the rest of the example is meant // to be somewhat realistic. This server has basic timeouts configured, and // it also exposes gRPC's server reflection and health check APIs. // protoc-gen-connect-go generates constructors that return plain net/http // Handlers, so they're compatible with most Go HTTP routers and middleware // (for example, net/http's StripPrefix). mux := http.NewServeMux() mux.Handle(pingv1connect.NewPingServiceHandler( &ExamplePingServer{}, // our business logic connect.WithReadMaxBytes(1024*1024), // limit request size )) // You can serve gRPC's health and server reflection APIs using // github.com/bufbuild/connect-ecosystem-go. // Timeouts, connection handling, TLS configuration, and other low-level // transport details are handled by net/http. Everything you already know (or // anything you learn) about hardening net/http Servers applies to connect // too. Keep in mind that any timeouts you set will also apply to streaming // RPCs! // // If you're not familiar with the many timeouts exposed by net/http, start with // https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/. srv := &http.Server{ Addr: ":http", Handler: mux, ReadTimeout: 2500 * time.Millisecond, WriteTimeout: 5 * time.Second, MaxHeaderBytes: 8 * 1024, // 8KiB, gRPC's recommendation } // You could also use golang.org/x/net/http2/h2c to serve gRPC requests // without TLS. srv.ListenAndServeTLS("testdata/server.crt", "testdata/server.key") }
Index ¶
- Constants
- func DecodeBinaryHeader(data string) ([]byte, error)
- func EncodeBinaryHeader(data []byte) string
- type AnyRequest
- type AnyResponse
- type BidiStream
- type BidiStreamForClient
- func (b *BidiStreamForClient[Req, Res]) CloseReceive() error
- func (b *BidiStreamForClient[Req, Res]) CloseSend() error
- func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error)
- func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header
- func (b *BidiStreamForClient[Req, Res]) ResponseHeader() http.Header
- func (b *BidiStreamForClient[Req, Res]) ResponseTrailer() http.Header
- func (b *BidiStreamForClient[Req, Res]) Send(msg *Req) error
- type Client
- func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res]
- func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res]
- func (c *Client[Req, Res]) CallServerStream(ctx context.Context, req *Request[Req]) (*ServerStreamForClient[Res], error)
- func (c *Client[Req, Res]) CallUnary(ctx context.Context, req *Request[Req]) (*Response[Res], error)
- type ClientOption
- type ClientStream
- type ClientStreamForClient
- type Code
- type Codec
- type Compressor
- type Decompressor
- type Error
- type ErrorDetail
- type HTTPClient
- type Handler
- type HandlerOption
- type Interceptor
- type Option
- func WithCodec(codec Codec) Option
- func WithCompressMinBytes(min int) Option
- func WithCompression[D Decompressor, C Compressor](name string, newDecompressor func() D, newCompressor func() C) Option
- func WithGzip() Option
- func WithInterceptors(interceptors ...Interceptor) Option
- func WithOptions(options ...Option) Option
- func WithProtoBinaryCodec() Option
- func WithProtoJSONCodec() Option
- func WithReadMaxBytes(n int64) Option
- type Receiver
- type Request
- type Response
- type Sender
- type ServerStream
- type ServerStreamForClient
- func (s *ServerStreamForClient[Res]) Close() error
- func (s *ServerStreamForClient[Res]) Err() error
- func (s *ServerStreamForClient[Res]) Msg() *Res
- func (s *ServerStreamForClient[Res]) Receive() bool
- func (s *ServerStreamForClient[Res]) ResponseHeader() http.Header
- func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header
- type Specification
- type StreamType
- type UnaryFunc
- type UnaryInterceptorFunc
- func (f UnaryInterceptorFunc) WrapStreamContext(ctx context.Context) context.Context
- func (f UnaryInterceptorFunc) WrapStreamReceiver(_ context.Context, receiver Receiver) Receiver
- func (f UnaryInterceptorFunc) WrapStreamSender(_ context.Context, sender Sender) Sender
- func (f UnaryInterceptorFunc) WrapUnary(next UnaryFunc) UnaryFunc
Examples ¶
Constants ¶
const ( StreamTypeUnary StreamType = 0b00 StreamTypeClient = 0b01 StreamTypeServer = 0b10 StreamTypeBidi = StreamTypeClient | StreamTypeServer )
const IsAtLeastVersion0_0_1 = true
These constants are used in compile-time handshakes with connect's generated code.
const Version = "0.0.1"
Version is the semantic version of the connect module.
Variables ¶
This section is empty.
Functions ¶
func DecodeBinaryHeader ¶
DecodeBinaryHeader base64-decodes the data. It can decode padded or unpadded values.
Binary headers sent by Google's gRPC implementations always have keys ending in "-Bin".
func EncodeBinaryHeader ¶
EncodeBinaryHeader base64-encodes the data. It always emits unpadded values.
For interoperability with Google's gRPC implementations, binary headers should have keys ending in "-Bin".
Types ¶
type AnyRequest ¶
type AnyRequest interface { Any() any Spec() Specification Header() http.Header // contains filtered or unexported methods }
AnyRequest is the common method set of all Requests, regardless of type parameter. It's used in unary interceptors.
To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyRequest.
type AnyResponse ¶
type AnyResponse interface { Any() any Header() http.Header Trailer() http.Header // contains filtered or unexported methods }
AnyResponse is the common method set of all Responses, regardless of type parameter. It's used in unary interceptors.
To preserve our ability to add methods to this interface without breaking backward compatibility, only types defined in this package can implement AnyRequest.
type BidiStream ¶
type BidiStream[Req, Res any] struct { // contains filtered or unexported fields }
BidiStream is the handler's view of a bidirectional streaming RPC.
func NewBidiStream ¶
func NewBidiStream[Req, Res any](s Sender, r Receiver) *BidiStream[Req, Res]
NewBidiStream constructs the handler's view of a bidirectional streaming RPC.
func (*BidiStream[Req, Res]) Receive ¶
func (b *BidiStream[Req, Res]) Receive() (*Req, error)
Receive a message. When the client is done sending messages, Receive will return an error that wraps io.EOF.
func (*BidiStream[Req, Res]) RequestHeader ¶
func (b *BidiStream[Req, Res]) RequestHeader() http.Header
RequestHeader returns the headers received from the client.
func (*BidiStream[Req, Res]) ResponseHeader ¶
func (b *BidiStream[Req, Res]) ResponseHeader() http.Header
ResponseHeader returns the response headers. Headers are sent with the first call to Send.
func (*BidiStream[Req, Res]) ResponseTrailer ¶
func (b *BidiStream[Req, Res]) ResponseTrailer() http.Header
ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.
func (*BidiStream[Req, Res]) Send ¶
func (b *BidiStream[Req, Res]) Send(msg *Res) error
Send a message to the client. The first call to Send also sends the response headers.
type BidiStreamForClient ¶
type BidiStreamForClient[Req, Res any] struct { // contains filtered or unexported fields }
BidiStreamForClient is the client's view of a bidirectional streaming RPC.
func NewBidiStreamForClient ¶
func NewBidiStreamForClient[Req, Res any](s Sender, r Receiver) *BidiStreamForClient[Req, Res]
NewBidiStreamForClient constructs the client's view of a bidirectional streaming RPC.
func (*BidiStreamForClient[Req, Res]) CloseReceive ¶
func (b *BidiStreamForClient[Req, Res]) CloseReceive() error
CloseReceive closes the receive side of the stream.
func (*BidiStreamForClient[Req, Res]) CloseSend ¶
func (b *BidiStreamForClient[Req, Res]) CloseSend() error
CloseSend closes the send side of the stream.
func (*BidiStreamForClient[Req, Res]) Receive ¶
func (b *BidiStreamForClient[Req, Res]) Receive() (*Res, error)
Receive a message. When the server is done sending messages and no other errors have occurred, Receive will return an error that wraps io.EOF.
func (*BidiStreamForClient[Req, Res]) RequestHeader ¶
func (b *BidiStreamForClient[Req, Res]) RequestHeader() http.Header
RequestHeader returns the request headers. Headers are sent with the first call to Send.
func (*BidiStreamForClient[Req, Res]) ResponseHeader ¶
func (b *BidiStreamForClient[Req, Res]) ResponseHeader() http.Header
ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.
func (*BidiStreamForClient[Req, Res]) ResponseTrailer ¶
func (b *BidiStreamForClient[Req, Res]) ResponseTrailer() http.Header
ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.
func (*BidiStreamForClient[Req, Res]) Send ¶
func (b *BidiStreamForClient[Req, Res]) Send(msg *Req) error
Send a message to the server. The first call to Send also sends the request headers.
If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for case using the standard library's errors.Is and unmarshal the error using Receive.
type Client ¶
type Client[Req, Res any] struct { // contains filtered or unexported fields }
Client is a reusable, concurrency-safe client for a single procedure. Depending on the procedure's type, use the CallUnary, CallClientStream, CallServerStream, or CallBidiStream method.
By default, clients use the binary protobuf Codec, ask for gzipped responses, and send uncompressed requests. They don't have a default protocol; callers of NewClient or generated client constructors must explicitly choose a protocol with either the WithGRPC or WithGRPCWeb options.
func NewClient ¶
func NewClient[Req, Res any]( httpClient HTTPClient, url string, options ...ClientOption, ) (*Client[Req, Res], error)
NewClient constructs a new Client.
func (*Client[Req, Res]) CallBidiStream ¶
func (c *Client[Req, Res]) CallBidiStream(ctx context.Context) *BidiStreamForClient[Req, Res]
CallBidiStream calls a bidirectional streaming procedure.
func (*Client[Req, Res]) CallClientStream ¶
func (c *Client[Req, Res]) CallClientStream(ctx context.Context) *ClientStreamForClient[Req, Res]
CallClientStream calls a client streaming procedure.
func (*Client[Req, Res]) CallServerStream ¶
func (c *Client[Req, Res]) CallServerStream( ctx context.Context, req *Request[Req], ) (*ServerStreamForClient[Res], error)
CallServerStream calls a server streaming procedure.
type ClientOption ¶
type ClientOption interface {
// contains filtered or unexported methods
}
A ClientOption configures a connect client.
In addition to any options grouped in the documentation below, remember that Options are also valid ClientOptions.
func WithClientOptions ¶
func WithClientOptions(options ...ClientOption) ClientOption
WithClientOptions composes multiple ClientOptions into one.
func WithGRPC ¶
func WithGRPC() ClientOption
WithGRPC configures clients to use the HTTP/2 gRPC protocol.
func WithGRPCWeb ¶
func WithGRPCWeb() ClientOption
WithGRPCWeb configures clients to use the gRPC-Web protocol.
func WithGzipRequests ¶
func WithGzipRequests() ClientOption
WithGzipRequests configures the client to gzip requests. It requires that the client already have a registered gzip compressor (via either WithGzip or WithCompressor).
Because some servers don't support gzip, clients default to sending uncompressed requests.
func WithRequestCompression ¶
func WithRequestCompression(name string) ClientOption
WithRequestCompression configures the client to use the specified algorithm to compress request messages. If the algorithm has not been registered using WithCompression, the generated client constructor will return an error.
Because some servers don't support compression, clients default to sending uncompressed requests.
type ClientStream ¶
type ClientStream[Req, Res any] struct { // contains filtered or unexported fields }
ClientStream is the handler's view of a client streaming RPC.
func NewClientStream ¶
func NewClientStream[Req, Res any](s Sender, r Receiver) *ClientStream[Req, Res]
NewClientStream constructs the handler's view of a client streaming RPC.
func (*ClientStream[Req, Res]) Err ¶
func (c *ClientStream[Req, Res]) Err() error
Err returns the first non-EOF error that was encountered by Receive.
func (*ClientStream[Req, Res]) Msg ¶
func (c *ClientStream[Req, Res]) Msg() *Req
Msg returns the most recent message unmarshaled by a call to Receive. The returned message points to data that will be overwritten by the next call to Receive.
func (*ClientStream[Req, Res]) Receive ¶
func (c *ClientStream[Req, Res]) Receive() bool
Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.
func (*ClientStream[Req, Res]) RequestHeader ¶
func (c *ClientStream[Req, Res]) RequestHeader() http.Header
RequestHeader returns the headers received from the client.
func (*ClientStream[Req, Res]) SendAndClose ¶
func (c *ClientStream[Req, Res]) SendAndClose(envelope *Response[Res]) error
SendAndClose closes the receive side of the stream, then sends a response back to the client.
type ClientStreamForClient ¶
type ClientStreamForClient[Req, Res any] struct { // contains filtered or unexported fields }
ClientStreamForClient is the client's view of a client streaming RPC.
func NewClientStreamForClient ¶
func NewClientStreamForClient[Req, Res any](s Sender, r Receiver) *ClientStreamForClient[Req, Res]
NewClientStreamForClient constructs the client's view of a client streaming RPC.
func (*ClientStreamForClient[Req, Res]) CloseAndReceive ¶
func (c *ClientStreamForClient[Req, Res]) CloseAndReceive() (*Response[Res], error)
CloseAndReceive closes the send side of the stream and waits for the response.
func (*ClientStreamForClient[Req, Res]) RequestHeader ¶
func (c *ClientStreamForClient[Req, Res]) RequestHeader() http.Header
RequestHeader returns the request headers. Headers are sent to the server with the first call to Send.
func (*ClientStreamForClient[Req, Res]) Send ¶
func (c *ClientStreamForClient[Req, Res]) Send(msg *Req) error
Send a message to the server. The first call to Send also sends the request headers.
If the server returns an error, Send returns an error that wraps io.EOF. Clients should check for case using the standard library's errors.Is and unmarshal the error using CloseAndReceive.
type Code ¶
type Code uint32
A Code is one of gRPC's canonical status codes. There are no user-defined codes, so only the codes enumerated below are valid.
See the specification at https://github.com/grpc/grpc/blob/master/doc/statuscodes.md for detailed descriptions of each code and example usage.
const ( // CodeCanceled indicates that the operation was canceled, typically by the // caller. // // Note that connect follows the gRPC specification (and some, but not all, // implementations) and uses the British "CANCELLED" as CodeCanceled's string // representation rather than the American "CANCELED". CodeCanceled Code = 1 // CodeUnknown indicates that the operation failed for an unknown reason. CodeUnknown Code = 2 // CodeInvalidArgument indicates that client supplied an invalid argument. // // Note that this differs from CodeFailedPrecondition: CodeInvalidArgument // indicates that the argument(s) are problematic regardless of the state of // the system (for example, an invalid URL). CodeInvalidArgument Code = 3 // CodeDeadlineExceeded indicates that deadline expired before the operation // could complete. For operations that change the state of the system, this // error may be returned even if the operation has completed successfully // (but late). CodeDeadlineExceeded Code = 4 // CodeNotFound indicates that some requested entity (for example, a file or // directory) was not found. // // If an operation is denied for an entire class of users, such as gradual // feature rollout or an undocumented allowlist, CodeNotFound may be used. If // a request is denied for some users within a class of users, such as // user-based access control, CodePermissionDenied must be used. CodeNotFound Code = 5 // CodeAlreadyExists indicates that client attempted to create an entity (for // example, a file or directory) that already exists. CodeAlreadyExists Code = 6 // CodePermissionDenied indicates that the caller does'nt have permission to // execute the specified operation. // // CodePermissionDenied must not be used for rejections caused by exhausting // some resource (use CodeResourceExhausted instead). CodePermissionDenied // must not be used if the caller can't be identified (use // CodeUnauthenticated instead). This error code doesn't imply that the // request is valid, the requested entity exists, or other preconditions are // satisfied. CodePermissionDenied Code = 7 // CodeResourceExhausted indicates that some resource has been exhausted. For // example, a per-user quota may be exhausted or the entire file system may // be full. CodeResourceExhausted Code = 8 // CodeFailedPrecondition indicates that the system is not in a state // required for the operation's execution. // // Service implementors can use the following guidelines to decide between // CodeFailedPrecondition, CodeAborted, and CodeUnavailable: // // - Use CodeUnavailable if the client can retry just the failing call. // - Use CodeAborted if the client should retry at a higher level. For // example, if a client-specified test-and-set fails, the client should // restart the whole read-modify-write sequence. // - Use CodeFailedPrecondition if the client should not retry until the // system state has been explicitly fixed. For example, a deleting a // directory on the filesystem might return CodeFailedPrecondition if the // directory still contains files, since the client should not retry unless // they first delete the offending files. CodeFailedPrecondition Code = 9 // CodeAborted indicates that operation was aborted by the system, usually // because of a concurrency issue such as a sequencer check failure or // transaction abort. // // The documentation for CodeFailedPrecondition includes guidelines for // choosing between CodeFailedPrecondition, CodeAborted, and CodeUnavailable. CodeAborted Code = 10 // CodeOutOfRange indicates that the operation was attempted past the valid // range (for example, seeking past end-of-file). // // Unlike CodeInvalidArgument, this error indicates a problem that may be // fixed if the system state changes. For example, a 32-bit file system will // generate CodeInvalidArgument if asked to read at an offset that is not in // the range [0,2^32), but it will generate CodeOutOfRange if asked to read // from an offset past the current file size. // // CodeOutOfRange naturally overlaps with CodeFailedPrecondition. Where // possible, use the more specific CodeOutOfRange so that callers who are // iterating through a space can easily detect when they're done. CodeOutOfRange Code = 11 // CodeUnimplemented indicates that the operation isn't implemented, // supported, or enabled in this service. CodeUnimplemented Code = 12 // CodeInternal indicates that some invariants expected by the underlying // system have been broken. This code is reserved for serious errors. CodeInternal Code = 13 // is usually temporary, so clients can back off and retry idempotent // operations. CodeUnavailable Code = 14 // CodeDataLoss indicates that the operation has resulted in unrecoverable // data loss or corruption. CodeDataLoss Code = 15 // CodeUnauthenticated indicates that the request does not have valid // authentication credentials for the operation. CodeUnauthenticated Code = 16 )
type Codec ¶
type Codec interface { // Name returns the name of the Codec. // // This may be used as part of the Content-Type within HTTP. For example, // with gRPC this is the content subtype, that is "application/grpc+proto" // will map to the Codec with name "proto". // // Names are expected to not be empty. Name() string // Marshal marshals the given message. // // Marshal may expect a specific type of message, and will error if this type is not given. Marshal(any) ([]byte, error) // Marshal unmarshals the given message. // // Unmarshal may expect a specific type of message, and will error if this type is not given. Unmarshal([]byte, any) error }
Codec marshals structs (typically generated from a schema) to and from bytes.
type Compressor ¶
type Compressor interface { io.Writer // Close flushes any buffered data to the underlying sink, then closes the // Compressor. It must not close the underlying sink. Close() error // Reset discards the Compressor's internal state, if any, and prepares it to // write compressed data to a new sink. Reset(io.Writer) }
A Compressor is a reusable wrapper that compresses data written to an underlying sink. The standard library's *gzip.Writer implements Compressor.
type Decompressor ¶
type Decompressor interface { io.Reader // Close closes the Decompressor, but not the underlying data source. It may // return an error if the Decompressor wasn't read to EOF. Close() error // Reset discards the Decompressor's internal state, if any, and prepares it // to read from a new source of compressed data. Reset(io.Reader) error }
A Decompressor is a reusable wrapper that decompresses an underlying data source. The standard library's *gzip.Reader implements Decompressor.
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
An Error captures three key pieces of information: a Code, an underlying Go error, and an optional collection of arbitrary protobuf messages called "details" (more on those below). Servers send the code, the underlying error's Error() output, and details over the wire to clients. Remember that the underlying error's message will be sent to clients - take care not to leak sensitive information from public APIs!
Service implementations and interceptors should return errors that can be cast to an *Error (using the standard library's errors.As). If the returned error can't be cast to an *Error, connect will use CodeUnknown and the returned error's message.
Error details were introduced before gRPC adopted a formal proposal process, so they're not clearly documented anywhere and may differ slightly between implementations. Roughly, they're an optional mechanism for servers, middleware, and proxies to attach arbitrary protobuf messages to the error code and message.
func (*Error) AddDetail ¶
func (e *Error) AddDetail(d ErrorDetail)
AddDetail appends a message to the error's details.
func (*Error) Details ¶
func (e *Error) Details() []ErrorDetail
Details returns the error's details.
func (*Error) Meta ¶
Meta allows the error to carry additional information as key-value pairs.
Metadata written by handlers may be sent as HTTP headers, HTTP trailers, or a block of in-body metadata, depending on the protocol in use and whether or not the handler has already written messages to the stream.
When clients receive errors, the metadata contains the union of the HTTP headers, HTTP trailers, and in-body metadata, if any.
type ErrorDetail ¶
type ErrorDetail interface { proto.Message MessageName() protoreflect.FullName UnmarshalTo(proto.Message) error }
An ErrorDetail is a self-describing protobuf message attached to an *Error. Error details are sent over the network to clients, which can then work with strongly-typed data rather than trying to parse a complex error message.
The ErrorDetail interface is implemented by protobuf's Any type, provided in Go by the google.golang.org/protobuf/types/known/anypb package. The google.golang.org/genproto/googleapis/rpc/errdetails package contains a variety of protobuf messages commonly wrapped in anypb.Any and used as error details.
type HTTPClient ¶
HTTPClient is the transport-level interface connect expects HTTP clients to implement. The standard library's http.Client implements HTTPClient.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
A Handler is the server-side implementation of a single RPC defined by a protocol buffer service.
By default, Handlers support the gRPC and gRPC-Web protocols with the binary protobuf and JSON codecs. They support gzip compression using the standard library's compress/gzip.
func NewBidiStreamHandler ¶
func NewBidiStreamHandler[Req, Res any]( procedure string, implementation func(context.Context, *BidiStream[Req, Res]) error, options ...HandlerOption, ) *Handler
NewBidiStreamHandler constructs a Handler for a bidirectional streaming procedure.
func NewClientStreamHandler ¶
func NewClientStreamHandler[Req, Res any]( procedure string, implementation func(context.Context, *ClientStream[Req, Res]) error, options ...HandlerOption, ) *Handler
NewClientStreamHandler constructs a Handler for a client streaming procedure.
func NewServerStreamHandler ¶
func NewServerStreamHandler[Req, Res any]( procedure string, implementation func(context.Context, *Request[Req], *ServerStream[Res]) error, options ...HandlerOption, ) *Handler
NewServerStreamHandler constructs a Handler for a server streaming procedure.
type HandlerOption ¶
type HandlerOption interface {
// contains filtered or unexported methods
}
A HandlerOption configures a Handler.
In addition to any options grouped in the documentation below, remember that all Options are also HandlerOptions.
func WithHandlerOptions ¶
func WithHandlerOptions(options ...HandlerOption) HandlerOption
WithHandlerOptions composes multiple HandlerOptions into one.
type Interceptor ¶
type Interceptor interface { // WrapUnary adds logic to a unary procedure. The returned UnaryFunc must be safe // to call concurrently. WrapUnary(UnaryFunc) UnaryFunc // WrapStreamContext, WrapStreamSender, and WrapStreamReceiver work together // to add logic to streaming procedures. Stream interceptors work in phases. // First, each interceptor may wrap the request context. Then, the connect // runtime constructs a (Sender, Receiver) pair. Finally, each interceptor // may wrap the Sender and/or Receiver. For example, the flow within a // Handler looks like this: // // func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // ctx := r.Context() // if ic := h.interceptor; ic != nil { // ctx = ic.WrapStreamContext(ctx) // } // sender, receiver := h.newStream(w, r.WithContext(ctx)) // if ic := h.interceptor; ic != nil { // sender = ic.WrapStreamSender(ctx, sender) // receiver = ic.WrapStreamReceiver(ctx, receiver) // } // h.serveStream(sender, receiver) // } // // Sender and Receiver implementations don't need to be safe for concurrent // use. WrapStreamContext(context.Context) context.Context WrapStreamSender(context.Context, Sender) Sender WrapStreamReceiver(context.Context, Receiver) Receiver }
An Interceptor adds logic to a generated handler or client, like the decorators or middleware you may have seen in other libraries. Interceptors may replace the context, mutate the request, mutate the response, handle the returned error, retry, recover from panics, emit logs and metrics, or do nearly anything else.
Example ¶
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */) loggingInterceptor := connect.UnaryInterceptorFunc( func(next connect.UnaryFunc) connect.UnaryFunc { return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { logger.Println("calling:", req.Spec().Procedure) logger.Println("request:", req.Any()) res, err := next(ctx, req) if err != nil { logger.Println("error:", err) } else { logger.Println("response:", res.Any()) } return res, err }) }, ) client, err := pingv1connect.NewPingServiceClient( examplePingServer.Client(), examplePingServer.URL(), connect.WithGRPC(), connect.WithInterceptors(loggingInterceptor), ) if err != nil { logger.Println("error:", err) return } client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{Number: 42}))
Output: calling: /connect.ping.v1.PingService/Ping request: number:42 response: number:42
type Option ¶
type Option interface { ClientOption HandlerOption }
Option implements both ClientOption and HandlerOption, so it can be applied both client-side and server-side.
func WithCodec ¶
WithCodec registers a serialization method with a client or handler. Registering a codec with an empty name is a no-op.
Typically, generated code automatically supplies this option with the appropriate codec(s). For example, handlers generated from protobuf schemas using protoc-gen-connect-go automatically register binary and JSON codecs. Users with more specialized needs may override the default codecs by registering a new codec under the same name.
Handlers may have multiple codecs registered, and use whichever the client chooses. Clients may only have a single codec.
func WithCompressMinBytes ¶
WithCompressMinBytes sets a minimum size threshold for compression: regardless of compressor configuration, messages smaller than the configured minimum are sent uncompressed.
The default minimum is zero. Setting a minimum compression threshold may improve overall performance, because the CPU cost of compressing very small messages usually isn't worth the small reduction in network I/O.
func WithCompression ¶
func WithCompression[D Decompressor, C Compressor]( name string, newDecompressor func() D, newCompressor func() C, ) Option
WithCompression configures client and server compression strategies. The Compressors and Decompressors produced by the supplied constructors must use the same algorithm.
For handlers, WithCompression registers a compression algorithm. Clients may send messages compressed with that algorithm and/or request compressed responses.
For clients, WithCompression serves two purposes. First, the client asks servers to compress responses using any of the registered algorithms. (gRPC's compression negotiation is complex, but most of Google's gRPC server implementations won't compress responses unless the request is compressed.) Second, it makes all the registered algorithms available for use with WithRequestCompression. Note that actually compressing requests requires using both WithCompression and WithRequestCompression.
Calling WithCompression with an empty name or nil constructors is a no-op.
func WithGzip ¶
func WithGzip() Option
WithGzip registers a gzip compressor backed by the standard library's gzip package with the default compression level.
Handlers with this option applied accept gzipped requests and can send gzipped responses. Clients with this option applied request gzipped responses, but don't automatically send gzipped requests (since the server may not support them). Use WithGzipRequests to gzip requests.
Handlers and clients generated by protoc-gen-connect-go apply WithGzip by default.
func WithInterceptors ¶
func WithInterceptors(interceptors ...Interceptor) Option
WithInterceptors configures a client or handler's interceptor stack. Repeated WithInterceptors options are applied in order, so
WithInterceptors(A) + WithInterceptors(B, C) == WithInterceptors(A, B, C)
Unary interceptors compose like an onion. The first interceptor provided is the outermost layer of the onion: it acts first on the context and request, and last on the response and error.
Stream interceptors also behave like an onion: the first interceptor provided is the first to wrap the context and is the outermost wrapper for the (Sender, Receiver) pair. It's the first to see sent messages and the last to see received messages.
Applied to client and handler, WithInterceptors(A, B, ..., Y, Z) produces:
client.Send() client.Receive() | ^ v | A --- --- A B --- --- B ... ... Y --- --- Y Z --- --- Z | ^ v | network network | ^ v | A --- --- A B --- --- B ... ... Y --- --- Y Z --- --- Z | ^ v | handler.Receive() handler.Send() | ^ | | -> handler logic --
Note that in clients, the Sender handles the request message(s) and the Receiver handles the response message(s). For handlers, it's the reverse. Depending on your interceptor's logic, you may need to wrap one side of the stream on the clients and the other side on handlers.
Example ¶
logger := log.New(os.Stdout, "" /* prefix */, 0 /* flags */) outer := connect.UnaryInterceptorFunc( func(next connect.UnaryFunc) connect.UnaryFunc { return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { logger.Println("outer interceptor: before call") res, err := next(ctx, req) logger.Println("outer interceptor: after call") return res, err }) }, ) inner := connect.UnaryInterceptorFunc( func(next connect.UnaryFunc) connect.UnaryFunc { return connect.UnaryFunc(func(ctx context.Context, req connect.AnyRequest) (connect.AnyResponse, error) { logger.Println("inner interceptor: before call") res, err := next(ctx, req) logger.Println("inner interceptor: after call") return res, err }) }, ) client, err := pingv1connect.NewPingServiceClient( examplePingServer.Client(), examplePingServer.URL(), connect.WithGRPC(), connect.WithInterceptors(outer, inner), ) if err != nil { logger.Println("error:", err) return } client.Ping(context.Background(), connect.NewRequest(&pingv1.PingRequest{}))
Output: outer interceptor: before call inner interceptor: before call inner interceptor: after call outer interceptor: after call
func WithOptions ¶
WithOptions composes multiple Options into one.
func WithProtoBinaryCodec ¶
func WithProtoBinaryCodec() Option
WithProtoBinaryCodec registers a binary protocol buffer codec that uses google.golang.org/protobuf/proto.
Handlers and clients generated by protoc-gen-connect-go have WithProtoBinaryCodec applied by default. To replace the default binary protobuf codec (with vtprotobuf, for example), apply WithCodec with a Codec whose name is "proto".
func WithProtoJSONCodec ¶
func WithProtoJSONCodec() Option
WithProtoJSONCodec registers a codec that serializes protocol buffer messages as JSON. It uses the standard protobuf JSON mapping as implemented by google.golang.org/protobuf/encoding/protojson: fields are named using lowerCamelCase, zero values are omitted, missing required fields are errors, enums are emitted as strings, etc.
Handlers generated by protoc-gen-connect-go have WithProtoJSONCodec applied by default.
func WithReadMaxBytes ¶
WithReadMaxBytes limits the performance impact of pathologically large messages sent by the other party. For handlers, WithReadMaxBytes limits the size of message that the client can send. For clients, WithReadMaxBytes limits the size of message that the server can respond with. Limits are applied before decompression and apply to each protobuf message, not to the stream as a whole.
Setting WithReadMaxBytes to zero allows any message size. Both clients and handlers default to allowing any request size.
type Receiver ¶
type Receiver interface { Receive(any) error Close() error Spec() Specification Header() http.Header // Trailers are populated only after Receive returns an error wrapping // io.EOF. Trailer() http.Header }
Receiver is the readable side of a bidirectional stream of messages. Receiver implementations do not need to be safe for concurrent use.
Receiver implementations provided by this module guarantee that all returned errors can be cast to *Error using errors.As.
type Request ¶
type Request[T any] struct { Msg *T // contains filtered or unexported fields }
Request is a wrapper around a generated request message. It provides access to metadata like headers and the RPC specification, as well as strongly-typed access to the message itself.
func NewRequest ¶
NewRequest wraps a generated request message.
func (*Request[_]) Any ¶
Any returns the concrete request message as an empty interface, so that *Request implements the AnyRequest interface.
func (*Request[_]) Spec ¶
func (r *Request[_]) Spec() Specification
Spec returns the Specification for this RPC.
type Response ¶
type Response[T any] struct { Msg *T // contains filtered or unexported fields }
Response is a wrapper around a generated response message. It provides access to metadata like headers and trailers, as well as strongly-typed access to the message itself.
func NewResponse ¶
NewResponse wraps a generated response message.
func (*Response[_]) Any ¶
Any returns the concrete response message as an empty interface, so that *Response implements the AnyResponse interface.
type Sender ¶
type Sender interface { Send(any) error Close(error) error Spec() Specification Header() http.Header Trailer() http.Header }
Sender is the writable side of a bidirectional stream of messages. Sender implementations do not need to be safe for concurrent use.
Sender implementations provided by this module guarantee that all returned errors can be cast to *Error using errors.As. The Close method of Sender implementations provided by this module automatically adds the appropriate codes when passed context.DeadlineExceeded or context.Canceled.
Like the standard library's http.ResponseWriter, both client- and handler-side Senders write headers to the network with the first call to Send. Any subsequent mutations to the headers are effectively no-ops.
Handler-side Senders may mutate trailers until calling Close, when the trailers are written to the network. Clients should avoid sending trailers: usage is nuanced, protocol-specific, and will likely create incompatibilities with other gRPC implementations.
Once servers return an error, they're not interested in receiving additional messages and clients should stop sending them. Client-side Senders indicate this by returning a wrapped io.EOF from Send. Clients should check for this condition with the standard library's errors.Is and call the receiver's Receive method to unmarshal the error.
type ServerStream ¶
type ServerStream[Res any] struct { // contains filtered or unexported fields }
ServerStream is the handler's view of a server streaming RPC.
func NewServerStream ¶
func NewServerStream[Res any](s Sender) *ServerStream[Res]
NewServerStream constructs the handler's view of a server streaming RPC.
func (*ServerStream[Res]) ResponseHeader ¶
func (s *ServerStream[Res]) ResponseHeader() http.Header
ResponseHeader returns the response headers. Headers are sent with the first call to Send.
func (*ServerStream[Res]) ResponseTrailer ¶
func (s *ServerStream[Res]) ResponseTrailer() http.Header
ResponseTrailer returns the response trailers. Handlers may write to the response trailers at any time before returning.
func (*ServerStream[Res]) Send ¶
func (s *ServerStream[Res]) Send(msg *Res) error
Send a message to the client. The first call to Send also sends the response headers.
type ServerStreamForClient ¶
type ServerStreamForClient[Res any] struct { // contains filtered or unexported fields }
ServerStreamForClient is the client's view of a server streaming RPC.
func NewServerStreamForClient ¶
func NewServerStreamForClient[Res any](r Receiver) *ServerStreamForClient[Res]
NewServerStreamForClient constructs the client's view of a server streaming RPC.
func (*ServerStreamForClient[Res]) Close ¶
func (s *ServerStreamForClient[Res]) Close() error
Close the receive side of the stream.
func (*ServerStreamForClient[Res]) Err ¶
func (s *ServerStreamForClient[Res]) Err() error
Err returns the first non-EOF error that was encountered by Receive.
func (*ServerStreamForClient[Res]) Msg ¶
func (s *ServerStreamForClient[Res]) Msg() *Res
Msg returns the most recent message unmarshaled by a call to Receive. The returned message points to data that will be overwritten by the next call to Receive.
func (*ServerStreamForClient[Res]) Receive ¶
func (s *ServerStreamForClient[Res]) Receive() bool
Receive advances the stream to the next message, which will then be available through the Msg method. It returns false when the stream stops, either by reaching the end or by encountering an unexpected error. After Receive returns false, the Err method will return any unexpected error encountered.
func (*ServerStreamForClient[Res]) ResponseHeader ¶
func (s *ServerStreamForClient[Res]) ResponseHeader() http.Header
ResponseHeader returns the headers received from the server. It blocks until the first call to Receive returns.
func (*ServerStreamForClient[Res]) ResponseTrailer ¶
func (s *ServerStreamForClient[Res]) ResponseTrailer() http.Header
ResponseTrailer returns the trailers received from the server. Trailers aren't fully populated until Receive() returns an error wrapping io.EOF.
type Specification ¶
type Specification struct { StreamType StreamType Procedure string // e.g., "/acme.foo.v1.FooService/Bar" IsClient bool // otherwise we're in a handler }
Specification is a description of a client call or a handler invocation.
type StreamType ¶
type StreamType uint8
StreamType describes whether the client, server, neither, or both is streaming.
type UnaryFunc ¶
type UnaryFunc func(context.Context, AnyRequest) (AnyResponse, error)
UnaryFunc is the generic signature of a unary RPC. Interceptors wrap Funcs.
The type of the request and response structs depend on the codec being used. When using protobuf, request.Any() and response.Any() will always be proto.Message implementations.
type UnaryInterceptorFunc ¶
UnaryInterceptorFunc is a simple Interceptor implementation that only wraps unary RPCs. It has no effect on client, server, or bidirectional streaming RPCs.
func (UnaryInterceptorFunc) WrapStreamContext ¶
func (f UnaryInterceptorFunc) WrapStreamContext(ctx context.Context) context.Context
WrapStreamContext implements Interceptor with a no-op.
func (UnaryInterceptorFunc) WrapStreamReceiver ¶
func (f UnaryInterceptorFunc) WrapStreamReceiver(_ context.Context, receiver Receiver) Receiver
WrapStreamReceiver implements Interceptor with a no-op.
func (UnaryInterceptorFunc) WrapStreamSender ¶
func (f UnaryInterceptorFunc) WrapStreamSender(_ context.Context, sender Sender) Sender
WrapStreamSender implements Interceptor with a no-op.
func (UnaryInterceptorFunc) WrapUnary ¶
func (f UnaryInterceptorFunc) WrapUnary(next UnaryFunc) UnaryFunc
WrapUnary implements Interceptor by applying the interceptor function.
Source Files
¶
- client.go
- client_stream.go
- code.go
- codec.go
- compression.go
- connect.go
- doc.go
- error.go
- handler.go
- handler_stream.go
- header.go
- interceptor.go
- nop_stream.go
- option.go
- protobuf_util.go
- protocol.go
- protocol_grpc.go
- protocol_grpc_client_stream.go
- protocol_grpc_handler_stream.go
- protocol_grpc_lpm.go
- protocol_grpc_timeout.go
- protocol_grpc_util.go
Directories
¶
Path | Synopsis |
---|---|
cmd
|
|
protoc-gen-connect-go
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code.
|
protoc-gen-connect-go is a plugin for the Protobuf compiler that generates Go code. |
internal
|
|
assert
Package assert is a minimal assert package using generics.
|
Package assert is a minimal assert package using generics. |