Documentation
¶
Overview ¶
Package grpc implements core utilities for use with gRPC, that are used by this module.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Reader ¶
type Reader struct { // Stream is the gRPC stream (client or server side - both are supported). Stream interface{ RecvMsg(m interface{}) error } // Factory is an implementation to support the stream's receive type. Factory ReaderMessageFactory // contains filtered or unexported fields }
Reader implements support for using a gRPC stream as an io.Reader, where the caller provides the implementation to transform received messages (in an arbitrary manner) into a binary stream.
It is not intended to be used in cases where the binary stream might be multiplexed with unhandled messages. This is related to limitations of the behavior, chosen for simplicity. Specifically, Read will return io.EOF immediately as soon as a call to ReaderMessage.Chunk returns false (note: Stream.RecvMsg may also return io.EOF). This implementation may still be useful in such cases, see also the Buffered method.
Example ¶
ExampleReader demonstrates how to use Reader by partially implementing the NetConn method of rc.RemoteControlServer.
defer testutil.CheckNumGoroutines(nil, runtime.NumGoroutine(), false, time.Second*15) ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3) defer cancel() // this will be our target, that the server will proxy to (via targetConn) targetConn, targetListener := net.Pipe() // this can be ignored - it's just mocking the behavior of the target conn // it implements ping-pong line buffered messages, hello world style go func() { defer targetListener.Close() scanner := bufio.NewScanner(targetListener) for ctx.Err() == nil { // poll (until cancel) if targetListener.SetReadDeadline(time.Now().Add(time.Second*5)) != nil { return } if !scanner.Scan() { if errors.Is(scanner.Err(), os.ErrDeadlineExceeded) { continue } return } _ = targetListener.SetWriteDeadline(time.Now().Add(time.Second * 15)) if _, err := fmt.Fprintf(targetListener, "hello %s!\n", bytes.TrimSpace(scanner.Bytes())); err != nil { panic(err) } } }() // implement a simple proxy impl with no support for the proper conn flow out := make(chan error, 1) control := mockRemoteControlServer{netConn: func(stream rc.RemoteControl_NetConnServer) (err error) { defer func() { out <- err }() // pretend there was something useful here, that to initialised targetConn defer targetConn.Close() reader := grpcutil.Reader{Stream: stream, Factory: grpcutil.NewReaderMessageFactory(func() (value interface{}, chunk func() ([]byte, bool)) { var msg rc.NetConnRequest value = &msg chunk = func() ([]byte, bool) { if v, ok := msg.GetData().(*rc.NetConnRequest_Bytes); ok { return v.Bytes, true } return nil, false } return })} writer := streamutil.ChunkWriter(func(b []byte) (int, error) { if err := stream.Send(&rc.NetConnResponse{Data: &rc.NetConnResponse_Bytes{Bytes: b}}); err != nil { return 0, err } return len(b), nil }) type ( ioReader io.Reader ioWriter io.Writer readWriter struct { ioReader ioWriter } ) err = streamutil.Proxy(ctx, readWriter{&reader, writer}, targetConn) if v := reader.Buffered(); v != nil { panic(v) } if err != nil { return } err = targetConn.Close() if err != nil { return } return }} // connect to the server conn := testutil.NewBufconnClient(0, func(_ *bufconn.Listener, srv *grpc.Server) { rc.RegisterRemoteControlServer(srv, &control) }) defer conn.Close() stream, err := rc.NewRemoteControlClient(conn).NetConn(ctx) if err != nil { panic(err) } // communicates with targetListener over stream pingPong := func(name string) { fmt.Printf("send %q\n", name) if err := stream.Send(&rc.NetConnRequest{Data: &rc.NetConnRequest_Bytes{Bytes: append(append(make([]byte, 0, len(name)+1), name...), '\n')}}); err != nil { panic(err) } msg, err := stream.Recv() if err != nil { panic(err) } if !bytes.HasSuffix(msg.GetBytes(), []byte("\n")) { panic(fmt.Errorf(`unexpected message %q in message: %s`, msg.GetBytes(), msg)) } fmt.Printf("recv %q\n", msg.GetBytes()[:len(msg.GetBytes())-1]) } pingPong("one") pingPong("two") pingPong("three") pingPong("four") if err := stream.CloseSend(); err != nil { panic(err) } if msg, err := stream.Recv(); err != io.EOF { panic(fmt.Errorf(`unexpected receive: %v %v`, msg, err)) } if err := <-out; err != nil { panic(err) }
Output: send "one" recv "hello one!" send "two" recv "hello two!" send "three" recv "hello three!" send "four" recv "hello four!"
type ReaderMessage ¶
type ReaderMessage interface { // Value should return a single proto.Message, to be passed into Reader.Stream.RecvMsg. Value() interface{} // Chunk should extract the stream chunk from Value, or return false, indicating EOF for the Reader. Chunk() ([]byte, bool) }
ReaderMessage is part of an implementation for use with Reader. Each ReaderMessage value should wrap a single underlying proto.Message (Value), and logic to extract data for the binary stream (Chunk).
type ReaderMessageFactory ¶
type ReaderMessageFactory func() ReaderMessage
ReaderMessageFactory initialises a new ReaderMessage for use by Reader.
func NewReaderMessageFactory ¶
func NewReaderMessageFactory(fn func() (value interface{}, chunk func() ([]byte, bool))) ReaderMessageFactory
NewReaderMessageFactory constructs a new ReaderMessageFactory using closures as a quick and dirty way to implement the interface. Note that it's entirely optional, and only provided as a convenience.
Example ¶
ExampleNewReaderMessageFactory demonstrates how to implement ReaderMessageFactory using NewReaderMessageFactory.
package main import ( "fmt" grpcutil "github.com/joeycumines/sesame/grpc" "github.com/joeycumines/sesame/rc" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" ) func main() { // construct a ReaderMessageFactory to handle sesame.v1alpha1.NetConnResponse messages factory := grpcutil.NewReaderMessageFactory(func() (value interface{}, chunk func() ([]byte, bool)) { var msg rc.NetConnResponse value = &msg chunk = func() ([]byte, bool) { if v, ok := msg.GetData().(*rc.NetConnResponse_Bytes); ok { return v.Bytes, true } return nil, false } return }) // factory would normally be used as a grpc.Reader's Factory field, but just to demonstrate the behavior... unmarshal := func(s string) grpcutil.ReaderMessage { msg := factory() if err := protojson.Unmarshal([]byte(s), msg.Value().(proto.Message)); err != nil { panic(err) } return msg } p := func(s string) { msg := unmarshal(s) b, ok := msg.Chunk() fmt.Printf("\nmsg %s;\n-> %v;\n-> ok=%v chunk=%q;\n", s, msg.Value(), ok, b) } fmt.Println(`using factory impl. for sesame.v1alpha1.NetConnResponse...`) p(`{"bytes":"c29tZSBkYXRh"}`) p(`{"bytes":""}`) p(`{"bytes":"MTIz"}`) p(`{"conn":{}}`) p(`{}`) }
Output: using factory impl. for sesame.v1alpha1.NetConnResponse... msg {"bytes":"c29tZSBkYXRh"}; -> bytes:"some data"; -> ok=true chunk="some data"; msg {"bytes":""}; -> bytes:""; -> ok=true chunk=""; msg {"bytes":"MTIz"}; -> bytes:"123"; -> ok=true chunk="123"; msg {"conn":{}}; -> conn:{}; -> ok=false chunk=""; msg {}; -> ; -> ok=false chunk="";