rpcwebrtc

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2021 License: MIT Imports: 30 Imported by: 0

Documentation

Overview

Package rpcwebrtc providers client/server functionality for gRPC serviced over WebRTC data channels. The work is adapted from https://github.com/jsmouret/grpc-over-webrtc.

Index

Constants

View Source
const RPCHostMetadataField = "rpc-host"

RPCHostMetadataField is the identifier of a host.

Variables

View Source
var (
	MongoDBCallQueueDBName   = "rpc"
	MongoDBCallQueueCollName = "calls"
)

Database and collection names used by the MongoDBCallQueue.

View Source
var DefaultMaxGRPCCalls = 256

DefaultMaxGRPCCalls is the maximum number of concurrent gRPC calls to allow for a server.

View Source
var DefaultWebRTCConfiguration = webrtc.Configuration{
	ICEServers: gostream.DefaultICEServers,
}

DefaultWebRTCConfiguration is the standard configuration used for WebRTC peers.

View Source
var (
	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
	// the state of the stream.
	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
)
View Source
var ErrNoSignaler = errors.New("no signaler present")

ErrNoSignaler happens if a gRPC request is made on a server that does not support signaling for WebRTC.

View Source
var MaxMessageSize = 1 << 24

MaxMessageSize is the maximum size a gRPC message can be.

View Source
var (
	// MaxStreamCount is the max number of streams a channel can have.
	MaxStreamCount = 256
)

Functions

func ErrorToStatus

func ErrorToStatus(err error) *status.Status

ErrorToStatus converts an error to a gRPC status. A nil error becomes a successful status.

Types

type CallAnswer

type CallAnswer struct {
	SDP string
	Err error
}

CallAnswer is the response to an offer. An agreement to start the call will contain an SDP about how the answerer wishes to speak.

type CallOffer

type CallOffer interface {
	// The SDP contains information the caller wants to tell the answerer about.
	SDP() string
}

CallOffer contains the information needed to offer to start a call.

type CallOfferResponder

type CallOfferResponder interface {
	CallOffer

	// Respond responds to the associated call offer with the given answer which contains
	// the SDP of the answerer or an error.
	Respond(ctx context.Context, ans CallAnswer) error
}

A CallOfferResponder is used by an answerer to respond to a call offer with an answer.

type CallQueue

type CallQueue interface {
	// SendOffer sends an offer associated with the given SDP to the given host.
	SendOffer(ctx context.Context, host, sdp string) (string, error)

	// RecvOffer receives the next offer for the given host. It should respond with an answer
	// once a decision is made.
	RecvOffer(ctx context.Context, host string) (CallOfferResponder, error)
}

A CallQueue handles the transmission and reception of call offers. For every sending of an offer done, it is expected that there is someone to receive that offer and subsequently respond to it.

type ClientChannel

type ClientChannel struct {
	// contains filtered or unexported fields
}

A ClientChannel reflects the client end of a gRPC connection serviced over a WebRTC data channel.

func Dial

func Dial(ctx context.Context, address string, insecure bool, logger golog.Logger) (ch *ClientChannel, err error)

Dial connects to the signaling service at the given address and attempts to establish a WebRTC connection with the corresponding peer reflected in the address.

func NewClientChannel

func NewClientChannel(
	peerConn *webrtc.PeerConnection,
	dataChannel *webrtc.DataChannel,
	logger golog.Logger,
) *ClientChannel

NewClientChannel wraps the given WebRTC data channel to be used as the client end of a gRPC connection.

func (*ClientChannel) Close

func (ch *ClientChannel) Close() error

Close closes all streams and the underlying channel.

func (ClientChannel) Closed

func (ch ClientChannel) Closed() (bool, error)

func (*ClientChannel) Invoke

func (ch *ClientChannel) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error

Invoke sends the RPC request on the wire and returns after response is received. This is typically called by generated code.

All errors returned by Invoke are compatible with the status package.

func (*ClientChannel) NewStream

func (ch *ClientChannel) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error)

NewStream creates a new Stream for the client side. This is typically called by generated code. ctx is used for the lifetime of the stream.

To ensure resources are not leaked due to the stream returned, one of the following actions must be performed:

  1. Call Close on the ClientConn.
  2. Cancel the context provided.
  3. Call RecvMsg until a non-nil error is returned. A protobuf-generated client-streaming RPC, for instance, might use the helper function CloseAndRecv (note that CloseSend does not Recv, therefore is not guaranteed to release all resources).
  4. Receive a non-nil, non-io.EOF error from Header or SendMsg.

If none of the above happen, a goroutine and a context will be leaked, and grpc will not call the optionally-configured stats handler with a stats.End message.

func (ClientChannel) Ready

func (ch ClientChannel) Ready() <-chan struct{}

type ClientStream

type ClientStream struct {
	// contains filtered or unexported fields
}

A ClientStream is the high level gRPC streaming interface used for both unary and streaming call requests.

func NewClientStream

func NewClientStream(
	ctx context.Context,
	channel *ClientChannel,
	stream *webrtcpb.Stream,
	onDone func(id uint64),
	logger golog.Logger,
) *ClientStream

NewClientStream creates a gRPC stream from the given client channel with a unique identity in order to be able to recognize responses on a single underlying data channel.

func (ClientStream) CloseRecv

func (s ClientStream) CloseRecv()

func (*ClientStream) CloseSend

func (s *ClientStream) CloseSend() error

CloseSend closes the send direction of the stream. It closes the stream when non-nil error is met. It is also not safe to call CloseSend concurrently with SendMsg.

func (ClientStream) Closed

func (s ClientStream) Closed() bool

func (*ClientStream) Context

func (s *ClientStream) Context() context.Context

Context returns the context for this stream.

It should not be called until after Header or RecvMsg has returned. Once called, subsequent client-side retries are disabled.

func (*ClientStream) Header

func (s *ClientStream) Header() (metadata.MD, error)

Header returns the header metadata received from the server if there is any. It blocks if the metadata is not ready to read.

func (ClientStream) RecvMsg

func (s ClientStream) RecvMsg(m interface{}) error

RecvMsg blocks until it receives a message into m or the stream is done. It returns io.EOF when the stream completes successfully. On any other error, the stream is aborted and the error contains the RPC status.

It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call RecvMsg on the same stream in different goroutines.

func (*ClientStream) SendMsg

func (s *ClientStream) SendMsg(m interface{}) error

SendMsg is generally called by generated code. On error, SendMsg aborts the stream. If the error was generated by the client, the status is returned directly; otherwise, io.EOF is returned and the status of the stream may be discovered using RecvMsg.

SendMsg blocks until:

  • There is sufficient flow control to schedule m with the transport, or
  • The stream is done, or
  • The stream breaks.

SendMsg does not wait until the message is received by the server. An untimely stream closure may result in lost messages. To ensure delivery, users should ensure the RPC completed successfully using RecvMsg.

It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines. It is also not safe to call CloseSend concurrently with SendMsg.

func (*ClientStream) Trailer

func (s *ClientStream) Trailer() metadata.MD

Trailer returns the trailer metadata from the server, if there is any. It must only be called after stream.CloseAndRecv has returned, or stream.Recv has returned a non-nil error (including io.EOF).

type MemoryCallQueue

type MemoryCallQueue struct {
	// contains filtered or unexported fields
}

A MemoryCallQueue is an in-memory implementation of a call queue designed to be used for testing and single node deployments.

func NewMemoryCallQueue

func NewMemoryCallQueue() *MemoryCallQueue

NewMemoryCallQueue returns a new, empty in-memory call queue.

func (*MemoryCallQueue) RecvOffer

func (queue *MemoryCallQueue) RecvOffer(ctx context.Context, host string) (CallOfferResponder, error)

RecvOffer receives the next offer for the given host. It should respond with an answer once a decision is made.

func (*MemoryCallQueue) SendOffer

func (queue *MemoryCallQueue) SendOffer(ctx context.Context, host, sdp string) (string, error)

SendOffer sends an offer associated with the given SDP to the given host.

type MongoDBCallQueue

type MongoDBCallQueue struct {
	// contains filtered or unexported fields
}

A MongoDBCallQueue is an MongoDB implementation of a call queue designed to be used for multi-node, distributed deployments.

func NewMongoDBCallQueue

func NewMongoDBCallQueue(client *mongo.Client) (*MongoDBCallQueue, error)

NewMongoDBCallQueue returns a new MongoDB based call queue where calls are transferred through the given client. TODO(https://github.com/viamrobotics/core/issues/108): more efficient, multiplexed change streams; uniquely identify host ephemerally TODO(https://github.com/viamrobotics/core/issues/109): max queue size

func (*MongoDBCallQueue) RecvOffer

func (queue *MongoDBCallQueue) RecvOffer(ctx context.Context, host string) (CallOfferResponder, error)

RecvOffer receives the next offer for the given host. It should respond with an answer once a decision is made.

func (*MongoDBCallQueue) SendOffer

func (queue *MongoDBCallQueue) SendOffer(ctx context.Context, host, sdp string) (string, error)

SendOffer sends an offer associated with the given SDP to the given host.

type Server

type Server struct {
	// contains filtered or unexported fields
}

A Server translates gRPC frames over WebRTC data channels into gRPC calls.

func NewServer

func NewServer(logger golog.Logger) *Server

NewServer makes a new server with no registered services.

func NewServerWithInterceptors

func NewServerWithInterceptors(
	logger golog.Logger,
	unaryInt grpc.UnaryServerInterceptor,
	streamInt grpc.StreamServerInterceptor,
) *Server

NewServerWithInterceptors makes a new server with no registered services that will use the given interceptors.

func (*Server) NewChannel

func (srv *Server) NewChannel(peerConn *webrtc.PeerConnection, dataChannel *webrtc.DataChannel) *ServerChannel

NewChannel binds the given data channel to be serviced as the server end of a gRPC connection.

func (*Server) RegisterService

func (srv *Server) RegisterService(sd *grpc.ServiceDesc, ss interface{})

RegisterService registers the given implementation of a service to be handled via WebRTC data channels. It extracts the unary and stream methods from a service description and calls the methods on the implementation when requested via a data channel.

func (*Server) Stop

func (srv *Server) Stop()

Stop instructs the server and all handlers to stop. It returns when all handlers are done executing.

type ServerChannel

type ServerChannel struct {
	// contains filtered or unexported fields
}

A ServerChannel reflects the server end of a gRPC connection serviced over a WebRTC data channel.

func NewServerChannel

func NewServerChannel(
	server *Server,
	peerConn *webrtc.PeerConnection,
	dataChannel *webrtc.DataChannel,
	logger golog.Logger,
) *ServerChannel

NewServerChannel wraps the given WebRTC data channel to be used as the server end of a gRPC connection.

func (ServerChannel) Close

func (ch ServerChannel) Close() error

func (ServerChannel) Closed

func (ch ServerChannel) Closed() (bool, error)

func (ServerChannel) Ready

func (ch ServerChannel) Ready() <-chan struct{}

type ServerStream

type ServerStream struct {
	// contains filtered or unexported fields
}

A ServerStream is the high level gRPC streaming interface used for handling both unary and streaming call responses.

func (ServerStream) CloseRecv

func (s ServerStream) CloseRecv()

func (ServerStream) Closed

func (s ServerStream) Closed() bool

func (ServerStream) Context

func (s ServerStream) Context() context.Context

Context returns the context for this stream.

func (ServerStream) RecvMsg

func (s ServerStream) RecvMsg(m interface{}) error

RecvMsg blocks until it receives a message into m or the stream is done. It returns io.EOF when the stream completes successfully. On any other error, the stream is aborted and the error contains the RPC status.

It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call RecvMsg on the same stream in different goroutines.

func (*ServerStream) SendHeader

func (s *ServerStream) SendHeader(header metadata.MD) error

SendHeader sends the header metadata. The provided md and headers set by SetHeader() will be sent. It fails if called multiple times.

func (*ServerStream) SendMsg

func (s *ServerStream) SendMsg(m interface{}) (err error)

SendMsg sends a message. On error, SendMsg aborts the stream and the error is returned directly.

SendMsg blocks until:

  • There is sufficient flow control to schedule m with the transport, or
  • The stream is done, or
  • The stream breaks.

SendMsg does not wait until the message is received by the client. An untimely stream closure may result in lost messages.

It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines.

func (*ServerStream) SetHeader

func (s *ServerStream) SetHeader(header metadata.MD) error

SetHeader sets the header metadata. It may be called multiple times. When call multiple times, all the provided metadata will be merged. All the metadata will be sent out when one of the following happens:

  • ServerStream.SendHeader() is called;
  • The first response is sent out;
  • An RPC status is sent out (error or success).

func (*ServerStream) SetTrailer

func (s *ServerStream) SetTrailer(trailer metadata.MD)

SetTrailer sets the trailer metadata which will be sent with the RPC status. When called more than once, all the provided metadata will be merged.

type SignalingAnswerer

type SignalingAnswerer struct {
	// contains filtered or unexported fields
}

A SignalingAnswerer listens for and answers calls with a given signaling service. It is directly connected to a Server that will handle the actual calls/connections over WebRTC data channels.

func NewSignalingAnswerer

func NewSignalingAnswerer(address, host string, server *Server, insecure bool, logger golog.Logger) *SignalingAnswerer

NewSignalingAnswerer makes an answerer that will connect to and listen for calls at the given address. Note that using this assumes that the connection at the given address is secure and assumed that all calls are authenticated. Random ports will be opened on this host to establish connections as a means to service ICE (https://webrtcforthecurious.com/docs/03-connecting/#how-does-it-work).

func (*SignalingAnswerer) Start

func (ans *SignalingAnswerer) Start() error

Start connects to the signaling service and listens forever until instructed to stop via Stop.

func (*SignalingAnswerer) Stop

func (ans *SignalingAnswerer) Stop()

Stop waits for the answer to stop listening and return.

type SignalingServer

type SignalingServer struct {
	webrtcpb.UnimplementedSignalingServiceServer
	// contains filtered or unexported fields
}

A SignalingServer implements a signaling service for WebRTC by exchanging SDPs (https://webrtcforthecurious.com/docs/02-signaling/#what-is-the-session-description-protocol-sdp) via gRPC. The service consists of a many-to-many interaction where there are many callers and many answerers. The callers provide an SDP to the service which asks a corresponding waiting answerer to provide an SDP in exchange in order to establish a P2P connection between the two parties.

func NewSignalingServer

func NewSignalingServer(callQueue CallQueue) *SignalingServer

NewSignalingServer makes a new signaling server that uses an in memory call queue and looks routes based on a given robot host. TODO(https://github.com/viamrobotics/core/issues/79): abstraction to be able to use MongoDB as a distributed call queue. This will enable many signaling services to run acting as effectively operators on as switchboard.

func (*SignalingServer) Answer

Answer listens on call/offer queue forever responding with SDPs to agreed to calls. TODO(https://github.com/viamrobotics/core/issues/104): This should be authorized for robots only.

func (*SignalingServer) Call

Call is a request/offer to start a caller with the connected answerer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL