Documentation
¶
Overview ¶
Package rpcwebrtc providers client/server functionality for gRPC serviced over WebRTC data channels. The work is adapted from https://github.com/jsmouret/grpc-over-webrtc.
Index ¶
- Constants
- Variables
- func ErrorToStatus(err error) *status.Status
- type CallAnswer
- type CallOffer
- type CallOfferResponder
- type CallQueue
- type ClientChannel
- func (ch *ClientChannel) Close() error
- func (ch ClientChannel) Closed() (bool, error)
- func (ch *ClientChannel) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, ...) error
- func (ch *ClientChannel) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, ...) (grpc.ClientStream, error)
- func (ch ClientChannel) Ready() <-chan struct{}
- type ClientStream
- func (s ClientStream) CloseRecv()
- func (s *ClientStream) CloseSend() error
- func (s ClientStream) Closed() bool
- func (s *ClientStream) Context() context.Context
- func (s *ClientStream) Header() (metadata.MD, error)
- func (s ClientStream) RecvMsg(m interface{}) error
- func (s *ClientStream) SendMsg(m interface{}) error
- func (s *ClientStream) Trailer() metadata.MD
- type MemoryCallQueue
- type MongoDBCallQueue
- type Server
- type ServerChannel
- type ServerStream
- func (s ServerStream) CloseRecv()
- func (s ServerStream) Closed() bool
- func (s ServerStream) Context() context.Context
- func (s ServerStream) RecvMsg(m interface{}) error
- func (s *ServerStream) SendHeader(header metadata.MD) error
- func (s *ServerStream) SendMsg(m interface{}) (err error)
- func (s *ServerStream) SetHeader(header metadata.MD) error
- func (s *ServerStream) SetTrailer(trailer metadata.MD)
- type SignalingAnswerer
- type SignalingServer
Constants ¶
const RPCHostMetadataField = "rpc-host"
RPCHostMetadataField is the identifier of a host.
Variables ¶
var ( MongoDBCallQueueDBName = "rpc" MongoDBCallQueueCollName = "calls" )
Database and collection names used by the MongoDBCallQueue.
var DefaultMaxGRPCCalls = 256
DefaultMaxGRPCCalls is the maximum number of concurrent gRPC calls to allow for a server.
var DefaultWebRTCConfiguration = webrtc.Configuration{ ICEServers: gostream.DefaultICEServers, }
DefaultWebRTCConfiguration is the standard configuration used for WebRTC peers.
var ( // ErrIllegalHeaderWrite indicates that setting header is illegal because of // the state of the stream. ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") )
var ErrNoSignaler = errors.New("no signaler present")
ErrNoSignaler happens if a gRPC request is made on a server that does not support signaling for WebRTC.
var MaxMessageSize = 1 << 24
MaxMessageSize is the maximum size a gRPC message can be.
var (
// MaxStreamCount is the max number of streams a channel can have.
MaxStreamCount = 256
)
Functions ¶
func ErrorToStatus ¶
ErrorToStatus converts an error to a gRPC status. A nil error becomes a successful status.
Types ¶
type CallAnswer ¶
CallAnswer is the response to an offer. An agreement to start the call will contain an SDP about how the answerer wishes to speak.
type CallOffer ¶
type CallOffer interface { // The SDP contains information the caller wants to tell the answerer about. SDP() string }
CallOffer contains the information needed to offer to start a call.
type CallOfferResponder ¶
type CallOfferResponder interface { CallOffer // Respond responds to the associated call offer with the given answer which contains // the SDP of the answerer or an error. Respond(ctx context.Context, ans CallAnswer) error }
A CallOfferResponder is used by an answerer to respond to a call offer with an answer.
type CallQueue ¶
type CallQueue interface { // SendOffer sends an offer associated with the given SDP to the given host. SendOffer(ctx context.Context, host, sdp string) (string, error) // RecvOffer receives the next offer for the given host. It should respond with an answer // once a decision is made. RecvOffer(ctx context.Context, host string) (CallOfferResponder, error) }
A CallQueue handles the transmission and reception of call offers. For every sending of an offer done, it is expected that there is someone to receive that offer and subsequently respond to it.
type ClientChannel ¶
type ClientChannel struct {
// contains filtered or unexported fields
}
A ClientChannel reflects the client end of a gRPC connection serviced over a WebRTC data channel.
func Dial ¶
func Dial(ctx context.Context, address string, insecure bool, logger golog.Logger) (ch *ClientChannel, err error)
Dial connects to the signaling service at the given address and attempts to establish a WebRTC connection with the corresponding peer reflected in the address.
func NewClientChannel ¶
func NewClientChannel( peerConn *webrtc.PeerConnection, dataChannel *webrtc.DataChannel, logger golog.Logger, ) *ClientChannel
NewClientChannel wraps the given WebRTC data channel to be used as the client end of a gRPC connection.
func (*ClientChannel) Close ¶
func (ch *ClientChannel) Close() error
Close closes all streams and the underlying channel.
func (*ClientChannel) Invoke ¶
func (ch *ClientChannel) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error
Invoke sends the RPC request on the wire and returns after response is received. This is typically called by generated code.
All errors returned by Invoke are compatible with the status package.
func (*ClientChannel) NewStream ¶
func (ch *ClientChannel) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error)
NewStream creates a new Stream for the client side. This is typically called by generated code. ctx is used for the lifetime of the stream.
To ensure resources are not leaked due to the stream returned, one of the following actions must be performed:
- Call Close on the ClientConn.
- Cancel the context provided.
- Call RecvMsg until a non-nil error is returned. A protobuf-generated client-streaming RPC, for instance, might use the helper function CloseAndRecv (note that CloseSend does not Recv, therefore is not guaranteed to release all resources).
- Receive a non-nil, non-io.EOF error from Header or SendMsg.
If none of the above happen, a goroutine and a context will be leaked, and grpc will not call the optionally-configured stats handler with a stats.End message.
type ClientStream ¶
type ClientStream struct {
// contains filtered or unexported fields
}
A ClientStream is the high level gRPC streaming interface used for both unary and streaming call requests.
func NewClientStream ¶
func NewClientStream( ctx context.Context, channel *ClientChannel, stream *webrtcpb.Stream, onDone func(id uint64), logger golog.Logger, ) *ClientStream
NewClientStream creates a gRPC stream from the given client channel with a unique identity in order to be able to recognize responses on a single underlying data channel.
func (*ClientStream) CloseSend ¶
func (s *ClientStream) CloseSend() error
CloseSend closes the send direction of the stream. It closes the stream when non-nil error is met. It is also not safe to call CloseSend concurrently with SendMsg.
func (*ClientStream) Context ¶
func (s *ClientStream) Context() context.Context
Context returns the context for this stream.
It should not be called until after Header or RecvMsg has returned. Once called, subsequent client-side retries are disabled.
func (*ClientStream) Header ¶
func (s *ClientStream) Header() (metadata.MD, error)
Header returns the header metadata received from the server if there is any. It blocks if the metadata is not ready to read.
func (ClientStream) RecvMsg ¶
func (s ClientStream) RecvMsg(m interface{}) error
RecvMsg blocks until it receives a message into m or the stream is done. It returns io.EOF when the stream completes successfully. On any other error, the stream is aborted and the error contains the RPC status.
It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call RecvMsg on the same stream in different goroutines.
func (*ClientStream) SendMsg ¶
func (s *ClientStream) SendMsg(m interface{}) error
SendMsg is generally called by generated code. On error, SendMsg aborts the stream. If the error was generated by the client, the status is returned directly; otherwise, io.EOF is returned and the status of the stream may be discovered using RecvMsg.
SendMsg blocks until:
- There is sufficient flow control to schedule m with the transport, or
- The stream is done, or
- The stream breaks.
SendMsg does not wait until the message is received by the server. An untimely stream closure may result in lost messages. To ensure delivery, users should ensure the RPC completed successfully using RecvMsg.
It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines. It is also not safe to call CloseSend concurrently with SendMsg.
func (*ClientStream) Trailer ¶
func (s *ClientStream) Trailer() metadata.MD
Trailer returns the trailer metadata from the server, if there is any. It must only be called after stream.CloseAndRecv has returned, or stream.Recv has returned a non-nil error (including io.EOF).
type MemoryCallQueue ¶
type MemoryCallQueue struct {
// contains filtered or unexported fields
}
A MemoryCallQueue is an in-memory implementation of a call queue designed to be used for testing and single node deployments.
func NewMemoryCallQueue ¶
func NewMemoryCallQueue() *MemoryCallQueue
NewMemoryCallQueue returns a new, empty in-memory call queue.
func (*MemoryCallQueue) RecvOffer ¶
func (queue *MemoryCallQueue) RecvOffer(ctx context.Context, host string) (CallOfferResponder, error)
RecvOffer receives the next offer for the given host. It should respond with an answer once a decision is made.
type MongoDBCallQueue ¶
type MongoDBCallQueue struct {
// contains filtered or unexported fields
}
A MongoDBCallQueue is an MongoDB implementation of a call queue designed to be used for multi-node, distributed deployments.
func NewMongoDBCallQueue ¶
func NewMongoDBCallQueue(client *mongo.Client) (*MongoDBCallQueue, error)
NewMongoDBCallQueue returns a new MongoDB based call queue where calls are transferred through the given client. TODO(https://github.com/viamrobotics/core/issues/108): more efficient, multiplexed change streams; uniquely identify host ephemerally TODO(https://github.com/viamrobotics/core/issues/109): max queue size
func (*MongoDBCallQueue) RecvOffer ¶
func (queue *MongoDBCallQueue) RecvOffer(ctx context.Context, host string) (CallOfferResponder, error)
RecvOffer receives the next offer for the given host. It should respond with an answer once a decision is made.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
A Server translates gRPC frames over WebRTC data channels into gRPC calls.
func NewServerWithInterceptors ¶
func NewServerWithInterceptors( logger golog.Logger, unaryInt grpc.UnaryServerInterceptor, streamInt grpc.StreamServerInterceptor, ) *Server
NewServerWithInterceptors makes a new server with no registered services that will use the given interceptors.
func (*Server) NewChannel ¶
func (srv *Server) NewChannel(peerConn *webrtc.PeerConnection, dataChannel *webrtc.DataChannel) *ServerChannel
NewChannel binds the given data channel to be serviced as the server end of a gRPC connection.
func (*Server) RegisterService ¶
func (srv *Server) RegisterService(sd *grpc.ServiceDesc, ss interface{})
RegisterService registers the given implementation of a service to be handled via WebRTC data channels. It extracts the unary and stream methods from a service description and calls the methods on the implementation when requested via a data channel.
type ServerChannel ¶
type ServerChannel struct {
// contains filtered or unexported fields
}
A ServerChannel reflects the server end of a gRPC connection serviced over a WebRTC data channel.
func NewServerChannel ¶
func NewServerChannel( server *Server, peerConn *webrtc.PeerConnection, dataChannel *webrtc.DataChannel, logger golog.Logger, ) *ServerChannel
NewServerChannel wraps the given WebRTC data channel to be used as the server end of a gRPC connection.
type ServerStream ¶
type ServerStream struct {
// contains filtered or unexported fields
}
A ServerStream is the high level gRPC streaming interface used for handling both unary and streaming call responses.
func (ServerStream) RecvMsg ¶
func (s ServerStream) RecvMsg(m interface{}) error
RecvMsg blocks until it receives a message into m or the stream is done. It returns io.EOF when the stream completes successfully. On any other error, the stream is aborted and the error contains the RPC status.
It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call RecvMsg on the same stream in different goroutines.
func (*ServerStream) SendHeader ¶
func (s *ServerStream) SendHeader(header metadata.MD) error
SendHeader sends the header metadata. The provided md and headers set by SetHeader() will be sent. It fails if called multiple times.
func (*ServerStream) SendMsg ¶
func (s *ServerStream) SendMsg(m interface{}) (err error)
SendMsg sends a message. On error, SendMsg aborts the stream and the error is returned directly.
SendMsg blocks until:
- There is sufficient flow control to schedule m with the transport, or
- The stream is done, or
- The stream breaks.
SendMsg does not wait until the message is received by the client. An untimely stream closure may result in lost messages.
It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines.
func (*ServerStream) SetHeader ¶
func (s *ServerStream) SetHeader(header metadata.MD) error
SetHeader sets the header metadata. It may be called multiple times. When call multiple times, all the provided metadata will be merged. All the metadata will be sent out when one of the following happens:
- ServerStream.SendHeader() is called;
- The first response is sent out;
- An RPC status is sent out (error or success).
func (*ServerStream) SetTrailer ¶
func (s *ServerStream) SetTrailer(trailer metadata.MD)
SetTrailer sets the trailer metadata which will be sent with the RPC status. When called more than once, all the provided metadata will be merged.
type SignalingAnswerer ¶
type SignalingAnswerer struct {
// contains filtered or unexported fields
}
A SignalingAnswerer listens for and answers calls with a given signaling service. It is directly connected to a Server that will handle the actual calls/connections over WebRTC data channels.
func NewSignalingAnswerer ¶
func NewSignalingAnswerer(address, host string, server *Server, insecure bool, logger golog.Logger) *SignalingAnswerer
NewSignalingAnswerer makes an answerer that will connect to and listen for calls at the given address. Note that using this assumes that the connection at the given address is secure and assumed that all calls are authenticated. Random ports will be opened on this host to establish connections as a means to service ICE (https://webrtcforthecurious.com/docs/03-connecting/#how-does-it-work).
func (*SignalingAnswerer) Start ¶
func (ans *SignalingAnswerer) Start() error
Start connects to the signaling service and listens forever until instructed to stop via Stop.
func (*SignalingAnswerer) Stop ¶
func (ans *SignalingAnswerer) Stop()
Stop waits for the answer to stop listening and return.
type SignalingServer ¶
type SignalingServer struct { webrtcpb.UnimplementedSignalingServiceServer // contains filtered or unexported fields }
A SignalingServer implements a signaling service for WebRTC by exchanging SDPs (https://webrtcforthecurious.com/docs/02-signaling/#what-is-the-session-description-protocol-sdp) via gRPC. The service consists of a many-to-many interaction where there are many callers and many answerers. The callers provide an SDP to the service which asks a corresponding waiting answerer to provide an SDP in exchange in order to establish a P2P connection between the two parties.
func NewSignalingServer ¶
func NewSignalingServer(callQueue CallQueue) *SignalingServer
NewSignalingServer makes a new signaling server that uses an in memory call queue and looks routes based on a given robot host. TODO(https://github.com/viamrobotics/core/issues/79): abstraction to be able to use MongoDB as a distributed call queue. This will enable many signaling services to run acting as effectively operators on as switchboard.
func (*SignalingServer) Answer ¶
func (srv *SignalingServer) Answer(server webrtcpb.SignalingService_AnswerServer) error
Answer listens on call/offer queue forever responding with SDPs to agreed to calls. TODO(https://github.com/viamrobotics/core/issues/104): This should be authorized for robots only.
func (*SignalingServer) Call ¶
func (srv *SignalingServer) Call(ctx context.Context, req *webrtcpb.CallRequest) (*webrtcpb.CallResponse, error)
Call is a request/offer to start a caller with the connected answerer.