grpc

package
v0.0.0-...-c7154f0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2023 License: MIT Imports: 3 Imported by: 0

Documentation

Overview

Package grpc implements core utilities for use with gRPC, that are used by this module.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Reader

type Reader struct {
	// Stream is the gRPC stream (client or server side - both are supported).
	Stream interface{ RecvMsg(m interface{}) error }

	// Factory is an implementation to support the stream's receive type.
	Factory ReaderMessageFactory
	// contains filtered or unexported fields
}

Reader implements support for using a gRPC stream as an io.Reader, where the caller provides the implementation to transform received messages (in an arbitrary manner) into a binary stream.

It is not intended to be used in cases where the binary stream might be multiplexed with unhandled messages. This is related to limitations of the behavior, chosen for simplicity. Specifically, Read will return io.EOF immediately as soon as a call to ReaderMessage.Chunk returns false (note: Stream.RecvMsg may also return io.EOF). This implementation may still be useful in such cases, see also the Buffered method.

Example

ExampleReader demonstrates how to use Reader by partially implementing the NetConn method of rc.RemoteControlServer.

defer testutil.CheckNumGoroutines(nil, runtime.NumGoroutine(), false, time.Second*15)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3)
defer cancel()

// this will be our target, that the server will proxy to (via targetConn)
targetConn, targetListener := net.Pipe()
// this can be ignored - it's just mocking the behavior of the target conn
// it implements ping-pong line buffered messages, hello world style
go func() {
	defer targetListener.Close()
	scanner := bufio.NewScanner(targetListener)
	for ctx.Err() == nil {
		// poll (until cancel)
		if targetListener.SetReadDeadline(time.Now().Add(time.Second*5)) != nil {
			return
		}
		if !scanner.Scan() {
			if errors.Is(scanner.Err(), os.ErrDeadlineExceeded) {
				continue
			}
			return
		}
		_ = targetListener.SetWriteDeadline(time.Now().Add(time.Second * 15))
		if _, err := fmt.Fprintf(targetListener, "hello %s!\n", bytes.TrimSpace(scanner.Bytes())); err != nil {
			panic(err)
		}
	}
}()

// implement a simple proxy impl with no support for the proper conn flow
out := make(chan error, 1)
control := mockRemoteControlServer{netConn: func(stream rc.RemoteControl_NetConnServer) (err error) {
	defer func() { out <- err }()

	// pretend there was something useful here, that to initialised targetConn
	defer targetConn.Close()

	reader := grpcutil.Reader{Stream: stream, Factory: grpcutil.NewReaderMessageFactory(func() (value interface{}, chunk func() ([]byte, bool)) {
		var msg rc.NetConnRequest
		value = &msg
		chunk = func() ([]byte, bool) {
			if v, ok := msg.GetData().(*rc.NetConnRequest_Bytes); ok {
				return v.Bytes, true
			}
			return nil, false
		}
		return
	})}

	writer := streamutil.ChunkWriter(func(b []byte) (int, error) {
		if err := stream.Send(&rc.NetConnResponse{Data: &rc.NetConnResponse_Bytes{Bytes: b}}); err != nil {
			return 0, err
		}
		return len(b), nil
	})

	type (
		ioReader   io.Reader
		ioWriter   io.Writer
		readWriter struct {
			ioReader
			ioWriter
		}
	)

	err = streamutil.Proxy(ctx, readWriter{&reader, writer}, targetConn)
	if v := reader.Buffered(); v != nil {
		panic(v)
	}
	if err != nil {
		return
	}

	err = targetConn.Close()
	if err != nil {
		return
	}

	return
}}

// connect to the server
conn := testutil.NewBufconnClient(0, func(_ *bufconn.Listener, srv *grpc.Server) { rc.RegisterRemoteControlServer(srv, &control) })
defer conn.Close()
stream, err := rc.NewRemoteControlClient(conn).NetConn(ctx)
if err != nil {
	panic(err)
}

// communicates with targetListener over stream
pingPong := func(name string) {
	fmt.Printf("send %q\n", name)
	if err := stream.Send(&rc.NetConnRequest{Data: &rc.NetConnRequest_Bytes{Bytes: append(append(make([]byte, 0, len(name)+1), name...), '\n')}}); err != nil {
		panic(err)
	}
	msg, err := stream.Recv()
	if err != nil {
		panic(err)
	}
	if !bytes.HasSuffix(msg.GetBytes(), []byte("\n")) {
		panic(fmt.Errorf(`unexpected message %q in message: %s`, msg.GetBytes(), msg))
	}
	fmt.Printf("recv %q\n", msg.GetBytes()[:len(msg.GetBytes())-1])
}

pingPong("one")
pingPong("two")
pingPong("three")
pingPong("four")

if err := stream.CloseSend(); err != nil {
	panic(err)
}

if msg, err := stream.Recv(); err != io.EOF {
	panic(fmt.Errorf(`unexpected receive: %v %v`, msg, err))
}

if err := <-out; err != nil {
	panic(err)
}
Output:

send "one"
recv "hello one!"
send "two"
recv "hello two!"
send "three"
recv "hello three!"
send "four"
recv "hello four!"

func (*Reader) Buffered

func (x *Reader) Buffered() interface{}

Buffered returns any buffered (unread) message from the Reader, e.g. a message that caused ReaderMessage.Chunk to return false. The return value will be the result of ReaderMessage.Value, or nil.

func (*Reader) Read

func (x *Reader) Read(b []byte) (int, error)

Read implements io.Reader using Stream.

type ReaderMessage

type ReaderMessage interface {
	// Value should return a single proto.Message, to be passed into Reader.Stream.RecvMsg.
	Value() interface{}

	// Chunk should extract the stream chunk from Value, or return false, indicating EOF for the Reader.
	Chunk() ([]byte, bool)
}

ReaderMessage is part of an implementation for use with Reader. Each ReaderMessage value should wrap a single underlying proto.Message (Value), and logic to extract data for the binary stream (Chunk).

type ReaderMessageFactory

type ReaderMessageFactory func() ReaderMessage

ReaderMessageFactory initialises a new ReaderMessage for use by Reader.

func NewReaderMessageFactory

func NewReaderMessageFactory(fn func() (value interface{}, chunk func() ([]byte, bool))) ReaderMessageFactory

NewReaderMessageFactory constructs a new ReaderMessageFactory using closures as a quick and dirty way to implement the interface. Note that it's entirely optional, and only provided as a convenience.

Example

ExampleNewReaderMessageFactory demonstrates how to implement ReaderMessageFactory using NewReaderMessageFactory.

package main

import (
	"fmt"

	grpcutil "github.com/joeycumines/sesame/grpc"
	"github.com/joeycumines/sesame/rc"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

func main() {
	// construct a ReaderMessageFactory to handle sesame.v1alpha1.NetConnResponse messages
	factory := grpcutil.NewReaderMessageFactory(func() (value interface{}, chunk func() ([]byte, bool)) {
		var msg rc.NetConnResponse
		value = &msg
		chunk = func() ([]byte, bool) {
			if v, ok := msg.GetData().(*rc.NetConnResponse_Bytes); ok {
				return v.Bytes, true
			}
			return nil, false
		}
		return
	})

	// factory would normally be used as a grpc.Reader's Factory field, but just to demonstrate the behavior...

	unmarshal := func(s string) grpcutil.ReaderMessage {
		msg := factory()
		if err := protojson.Unmarshal([]byte(s), msg.Value().(proto.Message)); err != nil {
			panic(err)
		}
		return msg
	}

	p := func(s string) {
		msg := unmarshal(s)
		b, ok := msg.Chunk()
		fmt.Printf("\nmsg %s;\n-> %v;\n-> ok=%v chunk=%q;\n", s, msg.Value(), ok, b)
	}

	fmt.Println(`using factory impl. for sesame.v1alpha1.NetConnResponse...`)
	p(`{"bytes":"c29tZSBkYXRh"}`)

	p(`{"bytes":""}`)

	p(`{"bytes":"MTIz"}`)

	p(`{"conn":{}}`)

	p(`{}`)

}
Output:

using factory impl. for sesame.v1alpha1.NetConnResponse...

msg {"bytes":"c29tZSBkYXRh"};
-> bytes:"some data";
-> ok=true chunk="some data";

msg {"bytes":""};
-> bytes:"";
-> ok=true chunk="";

msg {"bytes":"MTIz"};
-> bytes:"123";
-> ok=true chunk="123";

msg {"conn":{}};
-> conn:{};
-> ok=false chunk="";

msg {};
-> ;
-> ok=false chunk="";

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL