Documentation
¶
Overview ¶
Package grpctunnel provides tools to support tunneling of gRPC services: carrying gRPC calls over a gRPC stream.
This support includes "pinning" an RPC channel to a single server, by sending all requests on a single gRPC stream. There are also tools for adapting certain kinds of bidirectional stream RPCs into a stub such that a single stream looks like a sequence of unary calls.
This support also includes "reverse services", where a client can initiate a connection to a server and subsequently the server can then wrap that connection with an RPC stub, used to send requests from the server to that client (and the client then replies and sends responses back to the server).
Index ¶
- func HandleServerStream(stream grpc.Stream, serveFunc interface{}, ...) error
- func RegisterTunnelServiceServer(s *grpc.Server, srv TunnelServiceServer)
- func ServeReverseTunnel(stream TunnelService_OpenReverseTunnelClient, handlers grpchan.HandlerMap) error
- func ServeTunnel(stream TunnelService_OpenTunnelServer, handlers grpchan.HandlerMap) error
- type ClientToServer
- func (*ClientToServer) Descriptor() ([]byte, []int)
- func (m *ClientToServer) GetCancel() *empty.Empty
- func (m *ClientToServer) GetFrame() isClientToServer_Frame
- func (m *ClientToServer) GetHalfClose() *empty.Empty
- func (m *ClientToServer) GetMoreRequestData() []byte
- func (m *ClientToServer) GetNewStream() *NewStream
- func (m *ClientToServer) GetRequestMessage() *MessageData
- func (m *ClientToServer) GetStreamId() int64
- func (*ClientToServer) ProtoMessage()
- func (m *ClientToServer) Reset()
- func (m *ClientToServer) String() string
- func (m *ClientToServer) XXX_DiscardUnknown()
- func (m *ClientToServer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *ClientToServer) XXX_Merge(src proto.Message)
- func (*ClientToServer) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, ...)
- func (m *ClientToServer) XXX_Size() int
- func (m *ClientToServer) XXX_Unmarshal(b []byte) error
- type ClientToServer_Cancel
- type ClientToServer_HalfClose
- type ClientToServer_MoreRequestData
- type ClientToServer_NewStream
- type ClientToServer_RequestMessage
- type CloseStream
- func (*CloseStream) Descriptor() ([]byte, []int)
- func (m *CloseStream) GetResponseTrailers() *Metadata
- func (m *CloseStream) GetStatus() *status.Status
- func (*CloseStream) ProtoMessage()
- func (m *CloseStream) Reset()
- func (m *CloseStream) String() string
- func (m *CloseStream) XXX_DiscardUnknown()
- func (m *CloseStream) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *CloseStream) XXX_Merge(src proto.Message)
- func (m *CloseStream) XXX_Size() int
- func (m *CloseStream) XXX_Unmarshal(b []byte) error
- type CorruptResponseStreamError
- type MessageData
- func (*MessageData) Descriptor() ([]byte, []int)
- func (m *MessageData) GetData() []byte
- func (m *MessageData) GetSize() int32
- func (*MessageData) ProtoMessage()
- func (m *MessageData) Reset()
- func (m *MessageData) String() string
- func (m *MessageData) XXX_DiscardUnknown()
- func (m *MessageData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *MessageData) XXX_Merge(src proto.Message)
- func (m *MessageData) XXX_Size() int
- func (m *MessageData) XXX_Unmarshal(b []byte) error
- type Metadata
- func (*Metadata) Descriptor() ([]byte, []int)
- func (m *Metadata) GetMd() map[string]*Metadata_Values
- func (*Metadata) ProtoMessage()
- func (m *Metadata) Reset()
- func (m *Metadata) String() string
- func (m *Metadata) XXX_DiscardUnknown()
- func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Metadata) XXX_Merge(src proto.Message)
- func (m *Metadata) XXX_Size() int
- func (m *Metadata) XXX_Unmarshal(b []byte) error
- type Metadata_Values
- func (*Metadata_Values) Descriptor() ([]byte, []int)
- func (m *Metadata_Values) GetVal() []string
- func (*Metadata_Values) ProtoMessage()
- func (m *Metadata_Values) Reset()
- func (m *Metadata_Values) String() string
- func (m *Metadata_Values) XXX_DiscardUnknown()
- func (m *Metadata_Values) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *Metadata_Values) XXX_Merge(src proto.Message)
- func (m *Metadata_Values) XXX_Size() int
- func (m *Metadata_Values) XXX_Unmarshal(b []byte) error
- type NewStream
- func (*NewStream) Descriptor() ([]byte, []int)
- func (m *NewStream) GetMethodName() string
- func (m *NewStream) GetRequestHeaders() *Metadata
- func (*NewStream) ProtoMessage()
- func (m *NewStream) Reset()
- func (m *NewStream) String() string
- func (m *NewStream) XXX_DiscardUnknown()
- func (m *NewStream) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *NewStream) XXX_Merge(src proto.Message)
- func (m *NewStream) XXX_Size() int
- func (m *NewStream) XXX_Unmarshal(b []byte) error
- type ReverseTunnelChannel
- type ServerToClient
- func (*ServerToClient) Descriptor() ([]byte, []int)
- func (m *ServerToClient) GetCloseStream() *CloseStream
- func (m *ServerToClient) GetFrame() isServerToClient_Frame
- func (m *ServerToClient) GetMoreResponseData() []byte
- func (m *ServerToClient) GetResponseHeaders() *Metadata
- func (m *ServerToClient) GetResponseMessage() *MessageData
- func (m *ServerToClient) GetStreamId() int64
- func (*ServerToClient) ProtoMessage()
- func (m *ServerToClient) Reset()
- func (m *ServerToClient) String() string
- func (m *ServerToClient) XXX_DiscardUnknown()
- func (m *ServerToClient) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (dst *ServerToClient) XXX_Merge(src proto.Message)
- func (*ServerToClient) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, ...)
- func (m *ServerToClient) XXX_Size() int
- func (m *ServerToClient) XXX_Unmarshal(b []byte) error
- type ServerToClient_CloseStream
- type ServerToClient_MoreResponseData
- type ServerToClient_ResponseHeaders
- type ServerToClient_ResponseMessage
- type StreamAdapter
- type TunnelChannel
- func (c *TunnelChannel) Close()
- func (c *TunnelChannel) Context() context.Context
- func (c *TunnelChannel) Done() <-chan struct{}
- func (c *TunnelChannel) Err() error
- func (c *TunnelChannel) Invoke(ctx context.Context, methodName string, req, resp interface{}, ...) error
- func (c *TunnelChannel) IsDone() bool
- func (c *TunnelChannel) NewStream(ctx context.Context, desc *grpc.StreamDesc, methodName string, ...) (grpc.ClientStream, error)
- type TunnelServer
- func (s *TunnelServer) AllReverseTunnels() []*ReverseTunnelChannel
- func (s *TunnelServer) AsChannel() grpchan.Channel
- func (s *TunnelServer) FindChannel(search func(*ReverseTunnelChannel) bool) *ReverseTunnelChannel
- func (s *TunnelServer) KeyAsChannel(key interface{}) grpchan.Channel
- func (s *TunnelServer) OpenReverseTunnel(stream TunnelService_OpenReverseTunnelServer) error
- func (s *TunnelServer) OpenTunnel(stream TunnelService_OpenTunnelServer) error
- func (s *TunnelServer) RegisterService(desc *grpc.ServiceDesc, srv interface{})
- type TunnelServiceClient
- type TunnelServiceServer
- type TunnelService_OpenReverseTunnelClient
- type TunnelService_OpenReverseTunnelServer
- type TunnelService_OpenTunnelClient
- type TunnelService_OpenTunnelServer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HandleServerStream ¶
func HandleServerStream(stream grpc.Stream, serveFunc interface{}, requestKeyExtractor func(interface{}) interface{}, requestFactory func() interface{}) error
HandleServerStream reads request messages from the given stream, dispatching them to the given serveFunc. The serveFunc must be a function that either accepts one argument and returns one value or accepts two arguments, the first of which must be a context.Context, and returns one value.
The given requestKeyExtractor is used to determine a key. All requests for the same key are processed in FIFO order. If no such extractor is given, then the nil interface is the assumed key, eliminating parallelism. If an extractor is given, requests for different keys will be dispatched to serveFunc concurrently, from different goroutines. It is serveFunc's job to mark its return values with the same key, so that the client can correctly correlate responses with outstanding RPC requests.
The given requestFactory is used to create request instances, into which stream messages are unmarshaled. If nil is supplied, a factory will be created by querying the given stream for a method named "Recv" (via reflection). If it has such a method, it should take no arguments and return a value and an error. The type of that first return value will be used to reflectively construct request instances.
This function returns an error if a call to stream.RecvMsg(), to query for the next request, returns an error.
This function is used on the server side of the stream -- the side that will receive requests and then send responses in reply. (This does not necessarily have to be a gRPC server though, as a stream could allow clients to accept and process requests this way). For the client side of the stream, see NewStreamAdapter.
func RegisterTunnelServiceServer ¶
func RegisterTunnelServiceServer(s *grpc.Server, srv TunnelServiceServer)
func ServeReverseTunnel ¶
func ServeReverseTunnel(stream TunnelService_OpenReverseTunnelClient, handlers grpchan.HandlerMap) error
ServeReverseTunnel uses the given services to handle incoming RPC requests that arrive via the given client tunnel stream. Since this is a reverse tunnel, RPC requests are initiated by the server, and this end (the client) processes the requests and sends responses.
It returns if, in the process of reading requests, it detects invalid usage of the stream (client sending references to invalid stream IDs or sending frames for a stream ID in improper order) or if the stream itself fails (for example, if the client cancels the tunnel or there is a network disruption).
On return the provided stream should be canceled as soon as possible. Typical usage looks like so:
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := stub.OpenReverseTunnel(ctx)
if err != nil {
return err
}
return grpctunnel.ServeReverseTunnel(stream, handlers)
func ServeTunnel ¶
func ServeTunnel(stream TunnelService_OpenTunnelServer, handlers grpchan.HandlerMap) error
ServeTunnel uses the given services to handle incoming RPC requests that arrive via the given incoming tunnel stream.
It returns if, in the process of reading requests, it detects invalid usage of the stream (client sending references to invalid stream IDs or sending frames for a stream ID in improper order) or if the stream itself fails (for example, if the client cancels the tunnel or there is a network disruption).
This is typically called from a handler that implements the TunnelService. Typical usage looks like so:
func (h tunnelHandler) OpenTunnel(stream grpctunnel.TunnelService_OpenTunnelServer) error {
return grpctunnel.ServeTunnel(stream, h.handlers)
}
Types ¶
type ClientToServer ¶
type ClientToServer struct {
// The ID of the stream. Unlike in the HTTP/2 protocol, the stream IDs used
// do not have to be used sequentially, in monotonically increasing order,
// and can potentially be re-used (as long as no stream with the same ID is still
// active). However, it is 64-bit so that re-use due to overflow is extremely
// unlikely.
StreamId int64 `protobuf:"varint,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"`
// Types that are valid to be assigned to Frame:
// *ClientToServer_NewStream
// *ClientToServer_RequestMessage
// *ClientToServer_MoreRequestData
// *ClientToServer_HalfClose
// *ClientToServer_Cancel
Frame isClientToServer_Frame `protobuf_oneof:"frame"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
ClientToServer is the message a client sends to a server.
For a single stream ID, the first such message must include the new_stream field. After that, there can be any number of requests sent, via the request_message field and additional messages thereafter that use the more_request_data field (for requests that are larger than 16kb). And finally, the RPC ends with either the half_close or cancel fields. If the half_close field is used, the RPC stream remains active so the server may continue to send response data. But, if the cancel field is used, the RPC stream is aborted and thus closed on both client and server ends. If a stream has been half-closed, the only allowed message from the client for that stream ID is one with the cancel field, to abort the remainder of the operation.
func (*ClientToServer) Descriptor ¶
func (*ClientToServer) Descriptor() ([]byte, []int)
func (*ClientToServer) GetCancel ¶
func (m *ClientToServer) GetCancel() *empty.Empty
func (*ClientToServer) GetFrame ¶
func (m *ClientToServer) GetFrame() isClientToServer_Frame
func (*ClientToServer) GetHalfClose ¶
func (m *ClientToServer) GetHalfClose() *empty.Empty
func (*ClientToServer) GetMoreRequestData ¶
func (m *ClientToServer) GetMoreRequestData() []byte
func (*ClientToServer) GetNewStream ¶
func (m *ClientToServer) GetNewStream() *NewStream
func (*ClientToServer) GetRequestMessage ¶
func (m *ClientToServer) GetRequestMessage() *MessageData
func (*ClientToServer) GetStreamId ¶
func (m *ClientToServer) GetStreamId() int64
func (*ClientToServer) ProtoMessage ¶
func (*ClientToServer) ProtoMessage()
func (*ClientToServer) Reset ¶
func (m *ClientToServer) Reset()
func (*ClientToServer) String ¶
func (m *ClientToServer) String() string
func (*ClientToServer) XXX_DiscardUnknown ¶
func (m *ClientToServer) XXX_DiscardUnknown()
func (*ClientToServer) XXX_Marshal ¶
func (m *ClientToServer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ClientToServer) XXX_Merge ¶
func (dst *ClientToServer) XXX_Merge(src proto.Message)
func (*ClientToServer) XXX_OneofFuncs ¶
func (*ClientToServer) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{})
XXX_OneofFuncs is for the internal use of the proto package.
func (*ClientToServer) XXX_Size ¶
func (m *ClientToServer) XXX_Size() int
func (*ClientToServer) XXX_Unmarshal ¶
func (m *ClientToServer) XXX_Unmarshal(b []byte) error
type ClientToServer_Cancel ¶
type ClientToServer_MoreRequestData ¶
type ClientToServer_MoreRequestData struct {
MoreRequestData []byte `protobuf:"bytes,4,opt,name=more_request_data,json=moreRequestData,proto3,oneof"`
}
type ClientToServer_NewStream ¶
type ClientToServer_NewStream struct {
NewStream *NewStream `protobuf:"bytes,2,opt,name=new_stream,json=newStream,proto3,oneof"`
}
type ClientToServer_RequestMessage ¶
type ClientToServer_RequestMessage struct {
RequestMessage *MessageData `protobuf:"bytes,3,opt,name=request_message,json=requestMessage,proto3,oneof"`
}
type CloseStream ¶
type CloseStream struct {
ResponseTrailers *Metadata `protobuf:"bytes,1,opt,name=response_trailers,json=responseTrailers,proto3" json:"response_trailers,omitempty"`
Status *status.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (*CloseStream) Descriptor ¶
func (*CloseStream) Descriptor() ([]byte, []int)
func (*CloseStream) GetResponseTrailers ¶
func (m *CloseStream) GetResponseTrailers() *Metadata
func (*CloseStream) GetStatus ¶
func (m *CloseStream) GetStatus() *status.Status
func (*CloseStream) ProtoMessage ¶
func (*CloseStream) ProtoMessage()
func (*CloseStream) Reset ¶
func (m *CloseStream) Reset()
func (*CloseStream) String ¶
func (m *CloseStream) String() string
func (*CloseStream) XXX_DiscardUnknown ¶
func (m *CloseStream) XXX_DiscardUnknown()
func (*CloseStream) XXX_Marshal ¶
func (m *CloseStream) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*CloseStream) XXX_Merge ¶
func (dst *CloseStream) XXX_Merge(src proto.Message)
func (*CloseStream) XXX_Size ¶
func (m *CloseStream) XXX_Size() int
func (*CloseStream) XXX_Unmarshal ¶
func (m *CloseStream) XXX_Unmarshal(b []byte) error
type CorruptResponseStreamError ¶
type CorruptResponseStreamError struct {
// contains filtered or unexported fields
}
CorruptResponseStreamError is an error that occurs when a correlated stream adapter receives a message that cannot be correlated to a pending request.
func (CorruptResponseStreamError) Error ¶
func (e CorruptResponseStreamError) Error() string
Error implements the error interface
type MessageData ¶
type MessageData struct {
// The full size of the message.
Size int32 `protobuf:"varint,1,opt,name=size,proto3" json:"size,omitempty"`
// The message data. This field should not be longer than 16kb (16,384
// bytes). If the full size of the message is larger then it should be
// split into multiple chunks. The chunking is done to allow multiple
// access to the underlying gRPC stream by concurrent tunneled streams.
// If very large messages were sent via a single chunk, it could cause
// head-of-line blocking and starvation when multiple streams need to send
// data on the one underlying gRPC stream.
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (*MessageData) Descriptor ¶
func (*MessageData) Descriptor() ([]byte, []int)
func (*MessageData) GetData ¶
func (m *MessageData) GetData() []byte
func (*MessageData) GetSize ¶
func (m *MessageData) GetSize() int32
func (*MessageData) ProtoMessage ¶
func (*MessageData) ProtoMessage()
func (*MessageData) Reset ¶
func (m *MessageData) Reset()
func (*MessageData) String ¶
func (m *MessageData) String() string
func (*MessageData) XXX_DiscardUnknown ¶
func (m *MessageData) XXX_DiscardUnknown()
func (*MessageData) XXX_Marshal ¶
func (m *MessageData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*MessageData) XXX_Merge ¶
func (dst *MessageData) XXX_Merge(src proto.Message)
func (*MessageData) XXX_Size ¶
func (m *MessageData) XXX_Size() int
func (*MessageData) XXX_Unmarshal ¶
func (m *MessageData) XXX_Unmarshal(b []byte) error
type Metadata ¶
type Metadata struct {
Md map[string]*Metadata_Values `` /* 145-byte string literal not displayed */
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (*Metadata) Descriptor ¶
func (*Metadata) GetMd ¶
func (m *Metadata) GetMd() map[string]*Metadata_Values
func (*Metadata) ProtoMessage ¶
func (*Metadata) ProtoMessage()
func (*Metadata) XXX_DiscardUnknown ¶
func (m *Metadata) XXX_DiscardUnknown()
func (*Metadata) XXX_Marshal ¶
func (*Metadata) XXX_Unmarshal ¶
type Metadata_Values ¶
type Metadata_Values struct {
Val []string `protobuf:"bytes,1,rep,name=val,proto3" json:"val,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (*Metadata_Values) Descriptor ¶
func (*Metadata_Values) Descriptor() ([]byte, []int)
func (*Metadata_Values) GetVal ¶
func (m *Metadata_Values) GetVal() []string
func (*Metadata_Values) ProtoMessage ¶
func (*Metadata_Values) ProtoMessage()
func (*Metadata_Values) Reset ¶
func (m *Metadata_Values) Reset()
func (*Metadata_Values) String ¶
func (m *Metadata_Values) String() string
func (*Metadata_Values) XXX_DiscardUnknown ¶
func (m *Metadata_Values) XXX_DiscardUnknown()
func (*Metadata_Values) XXX_Marshal ¶
func (m *Metadata_Values) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*Metadata_Values) XXX_Merge ¶
func (dst *Metadata_Values) XXX_Merge(src proto.Message)
func (*Metadata_Values) XXX_Size ¶
func (m *Metadata_Values) XXX_Size() int
func (*Metadata_Values) XXX_Unmarshal ¶
func (m *Metadata_Values) XXX_Unmarshal(b []byte) error
type NewStream ¶
type NewStream struct {
MethodName string `protobuf:"bytes,1,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"`
RequestHeaders *Metadata `protobuf:"bytes,2,opt,name=request_headers,json=requestHeaders,proto3" json:"request_headers,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (*NewStream) Descriptor ¶
func (*NewStream) GetMethodName ¶
func (*NewStream) GetRequestHeaders ¶
func (*NewStream) ProtoMessage ¶
func (*NewStream) ProtoMessage()
func (*NewStream) XXX_DiscardUnknown ¶
func (m *NewStream) XXX_DiscardUnknown()
func (*NewStream) XXX_Marshal ¶
func (*NewStream) XXX_Unmarshal ¶
type ReverseTunnelChannel ¶
type ReverseTunnelChannel struct {
*TunnelChannel
Peer *peer.Peer
RequestHeaders metadata.MD
}
func NewReverseChannel ¶
func NewReverseChannel(stream TunnelService_OpenReverseTunnelServer) *ReverseTunnelChannel
type ServerToClient ¶
type ServerToClient struct {
// The ID of the stream. Unlike in the HTTP/2 protocol, the stream IDs used
// do not have to be used sequentially, in monotonically increasing order,
// and can safely be re-used (as long as no stream with the same ID is still
// active).
StreamId int64 `protobuf:"varint,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"`
// Types that are valid to be assigned to Frame:
// *ServerToClient_ResponseHeaders
// *ServerToClient_ResponseMessage
// *ServerToClient_MoreResponseData
// *ServerToClient_CloseStream
Frame isServerToClient_Frame `protobuf_oneof:"frame"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
ServerToClient is the message a server sends to a client.
For a single stream ID, the first such message should include the response_headers field unless no headers are to be sent. After the headers, the server can send any number of responses, via the response_message field and additional messages thereafter that use the more_response_data field (for responses that are larger than 16kb). A message with the close_stream field concludes the stream, whether it terminates successfully or with an error.
func (*ServerToClient) Descriptor ¶
func (*ServerToClient) Descriptor() ([]byte, []int)
func (*ServerToClient) GetCloseStream ¶
func (m *ServerToClient) GetCloseStream() *CloseStream
func (*ServerToClient) GetFrame ¶
func (m *ServerToClient) GetFrame() isServerToClient_Frame
func (*ServerToClient) GetMoreResponseData ¶
func (m *ServerToClient) GetMoreResponseData() []byte
func (*ServerToClient) GetResponseHeaders ¶
func (m *ServerToClient) GetResponseHeaders() *Metadata
func (*ServerToClient) GetResponseMessage ¶
func (m *ServerToClient) GetResponseMessage() *MessageData
func (*ServerToClient) GetStreamId ¶
func (m *ServerToClient) GetStreamId() int64
func (*ServerToClient) ProtoMessage ¶
func (*ServerToClient) ProtoMessage()
func (*ServerToClient) Reset ¶
func (m *ServerToClient) Reset()
func (*ServerToClient) String ¶
func (m *ServerToClient) String() string
func (*ServerToClient) XXX_DiscardUnknown ¶
func (m *ServerToClient) XXX_DiscardUnknown()
func (*ServerToClient) XXX_Marshal ¶
func (m *ServerToClient) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ServerToClient) XXX_Merge ¶
func (dst *ServerToClient) XXX_Merge(src proto.Message)
func (*ServerToClient) XXX_OneofFuncs ¶
func (*ServerToClient) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{})
XXX_OneofFuncs is for the internal use of the proto package.
func (*ServerToClient) XXX_Size ¶
func (m *ServerToClient) XXX_Size() int
func (*ServerToClient) XXX_Unmarshal ¶
func (m *ServerToClient) XXX_Unmarshal(b []byte) error
type ServerToClient_CloseStream ¶
type ServerToClient_CloseStream struct {
CloseStream *CloseStream `protobuf:"bytes,5,opt,name=close_stream,json=closeStream,proto3,oneof"`
}
type ServerToClient_MoreResponseData ¶
type ServerToClient_MoreResponseData struct {
MoreResponseData []byte `protobuf:"bytes,4,opt,name=more_response_data,json=moreResponseData,proto3,oneof"`
}
type ServerToClient_ResponseHeaders ¶
type ServerToClient_ResponseHeaders struct {
ResponseHeaders *Metadata `protobuf:"bytes,2,opt,name=response_headers,json=responseHeaders,proto3,oneof"`
}
type ServerToClient_ResponseMessage ¶
type ServerToClient_ResponseMessage struct {
ResponseMessage *MessageData `protobuf:"bytes,3,opt,name=response_message,json=responseMessage,proto3,oneof"`
}
type StreamAdapter ¶
type StreamAdapter struct {
// contains filtered or unexported fields
}
StreamAdapter wraps a grpc.Stream and implements a simple request-response mechanism on top of it.
func NewStreamAdapter ¶
func NewStreamAdapter(stream grpc.Stream, requestKeyExtractor, responseKeyExtractor func(interface{}) interface{}, responseFactory func() interface{}) *StreamAdapter
NewStreamAdapter returns a stream adapter that correlates responses with their corresponding requests via the given key extractor functions. If either function is nil, both must be nil (in which case the extracted key is the nil interface, meaning everything must be correlated via FIFO order).
A request message is supplied to requestKeyExtractor to determine the key. When a response message is received, the given responseKeyExtractor is consulted. The resulting response key is expected to match the key of a pending request. If a message is received that does not correspond to any pending request, the stream is closed and all pending operations fail with a CorruptResponseStreamError.
If there are multiple pending requests for a particular key, they are serviced in FIFO order. So if two requests map to the same key, the first matching response is assumed to correspond to the first such request, and so on.
The given responseFactory is used to create new instances into which response data is unmarshaled. If it is nil, the given stream must have a Recv method that accepts no arguments and returns two: a response message and an error. The return type will be examined (via reflection) and values of the right type will be created via reflection. Stream types generated by protoc for use with normal gRPC streaming stubs will have such a method and thus can be used so that calling code need not explicitly supply a factory.
This function is used on the client side of the stream -- the side that will send requests and then expects responses in reply. (This does not necessarily have to be a gRPC client though, as a stream could allow servers to initiate requests this way). For the server side of the stream, see HandleServerStream.
func (*StreamAdapter) Call ¶
func (a *StreamAdapter) Call(ctx context.Context, req interface{}) (interface{}, error)
Call performs a single request-response round trip. It sends the given request on the stream and waits for a corresponding response, which is recorded in the given response message. It returns a non-nil error in the event of a failure.
If the given context contains request metadata, they are ignored. Also, as observed in the signature, gRPC call options are not supported. The context is only used to allow the call to return early in the event of context completion (in which case the return values will be the context error). If the context times out or is cancelled, nothing happens in the underlying stream, and any response message that eventually arrives will effectively be ignored.
func (*StreamAdapter) Context ¶
func (a *StreamAdapter) Context() context.Context
type TunnelChannel ¶
type TunnelChannel struct {
// contains filtered or unexported fields
}
func NewChannel ¶
func NewChannel(stream TunnelService_OpenTunnelClient) *TunnelChannel
func (*TunnelChannel) Close ¶
func (c *TunnelChannel) Close()
func (*TunnelChannel) Context ¶
func (c *TunnelChannel) Context() context.Context
func (*TunnelChannel) Done ¶
func (c *TunnelChannel) Done() <-chan struct{}
func (*TunnelChannel) Err ¶
func (c *TunnelChannel) Err() error
func (*TunnelChannel) Invoke ¶
func (c *TunnelChannel) Invoke(ctx context.Context, methodName string, req, resp interface{}, opts ...grpc.CallOption) error
func (*TunnelChannel) IsDone ¶
func (c *TunnelChannel) IsDone() bool
func (*TunnelChannel) NewStream ¶
func (c *TunnelChannel) NewStream(ctx context.Context, desc *grpc.StreamDesc, methodName string, opts ...grpc.CallOption) (grpc.ClientStream, error)
type TunnelServer ¶
type TunnelServer struct {
// If set, reverse tunnels will not be allowed. The server will reply to
// OpenReverseTunnel requests with an "Unimplemented" error code.
NoReverseTunnels bool
// If reverse tunnels are allowed, this callback may be configured to
// receive information when clients open a reverse tunnel.
OnReverseTunnelConnect func(*ReverseTunnelChannel)
// If reverse tunnels are allowed, this callback may be configured to
// receive information when reverse tunnels are torn down.
OnReverseTunnelDisconnect func(*ReverseTunnelChannel)
// Optional function that accepts a reverse tunnel and returns an affinity
// key. The affinity key values can be used to look up outbound channels,
// for targeting calls to particular clients or groups of clients.
AffinityKey func(*ReverseTunnelChannel) interface{}
// contains filtered or unexported fields
}
TunnelServer provides an implementation for grpctunnel.TunnelServiceServer. You can register handlers with it, and it will then expose those handlers for incoming tunnels. If no handlers are registered, the server will reply to OpenTunnel requests with an "Unimplemented" error code. The server may still be used for reverse tunnels
For reverse tunnels, if supported, all connected channels (e.g. all clients that have created reverse tunnels) are available. You can also configure a listener to receive notices when channels are connected and disconnected.
func (*TunnelServer) AllReverseTunnels ¶
func (s *TunnelServer) AllReverseTunnels() []*ReverseTunnelChannel
func (*TunnelServer) AsChannel ¶
func (s *TunnelServer) AsChannel() grpchan.Channel
func (*TunnelServer) FindChannel ¶
func (s *TunnelServer) FindChannel(search func(*ReverseTunnelChannel) bool) *ReverseTunnelChannel
func (*TunnelServer) KeyAsChannel ¶
func (s *TunnelServer) KeyAsChannel(key interface{}) grpchan.Channel
func (*TunnelServer) OpenReverseTunnel ¶
func (s *TunnelServer) OpenReverseTunnel(stream TunnelService_OpenReverseTunnelServer) error
func (*TunnelServer) OpenTunnel ¶
func (s *TunnelServer) OpenTunnel(stream TunnelService_OpenTunnelServer) error
func (*TunnelServer) RegisterService ¶
func (s *TunnelServer) RegisterService(desc *grpc.ServiceDesc, srv interface{})
type TunnelServiceClient ¶
type TunnelServiceClient interface {
// OpenTunnel creates a channel to the server which can be used to send
// additional RPCs, all of which will be sent to the same server via a
// single underlying gRPC stream. This can provide affinity for a "chatty"
// sequence of calls, where the gRPC connection is load balanced (so there
// may be multiple backend servers), but a particular "conversation" (which
// may consist of numerous RPCs) needs to all go to a single server, for
// consistency.
OpenTunnel(ctx context.Context, opts ...grpc.CallOption) (TunnelService_OpenTunnelClient, error)
// OpenReverseTunnel creates a "reverse" channel, which allows the server to
// act as a client and send RPCs to the client that creates the tunnel. It
// is in most respects identical to OpenTunnel except that the roles are
// reversed: the server initiates RPCs and sends requests and the client
// replies to them and sends responses.
OpenReverseTunnel(ctx context.Context, opts ...grpc.CallOption) (TunnelService_OpenReverseTunnelClient, error)
}
TunnelServiceClient is the client API for TunnelService service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
func NewTunnelServiceClient ¶
func NewTunnelServiceClient(cc *grpc.ClientConn) TunnelServiceClient
type TunnelServiceServer ¶
type TunnelServiceServer interface {
// OpenTunnel creates a channel to the server which can be used to send
// additional RPCs, all of which will be sent to the same server via a
// single underlying gRPC stream. This can provide affinity for a "chatty"
// sequence of calls, where the gRPC connection is load balanced (so there
// may be multiple backend servers), but a particular "conversation" (which
// may consist of numerous RPCs) needs to all go to a single server, for
// consistency.
OpenTunnel(TunnelService_OpenTunnelServer) error
// OpenReverseTunnel creates a "reverse" channel, which allows the server to
// act as a client and send RPCs to the client that creates the tunnel. It
// is in most respects identical to OpenTunnel except that the roles are
// reversed: the server initiates RPCs and sends requests and the client
// replies to them and sends responses.
OpenReverseTunnel(TunnelService_OpenReverseTunnelServer) error
}
TunnelServiceServer is the server API for TunnelService service.
type TunnelService_OpenReverseTunnelClient ¶
type TunnelService_OpenReverseTunnelClient interface {
Send(*ServerToClient) error
Recv() (*ClientToServer, error)
grpc.ClientStream
}
type TunnelService_OpenReverseTunnelServer ¶
type TunnelService_OpenReverseTunnelServer interface {
Send(*ClientToServer) error
Recv() (*ServerToClient, error)
grpc.ServerStream
}
type TunnelService_OpenTunnelClient ¶
type TunnelService_OpenTunnelClient interface {
Send(*ClientToServer) error
Recv() (*ServerToClient, error)
grpc.ClientStream
}
type TunnelService_OpenTunnelServer ¶
type TunnelService_OpenTunnelServer interface {
Send(*ServerToClient) error
Recv() (*ClientToServer, error)
grpc.ServerStream
}