Documentation ¶
Index ¶
- Constants
- Variables
- func Chunk(data []byte) [][]byte
- func ChunkReader(r io.Reader, f func([]byte) error) (int, error)
- func Collect[T proto.Message](cs ClientStream[T], max int) (ret []T, _ error)
- func ForEach[T proto.Message](cs ClientStream[T], fn func(x T) error) error
- func GetBuffer() []byte
- func NewStreamingBytesReader(streamingBytesClient StreamingBytesClient, cancel context.CancelFunc) io.ReadCloser
- func NewTestClient(t testing.TB, regFunc func(*grpc.Server)) *grpc.ClientConn
- func PutBuffer(buf []byte)
- func Read[T proto.Message](cs ClientStream[T], buf []T) (int, error)
- func ScrubGRPC(err error) error
- func WithStreamingBytesWriter(streamingBytesServer StreamingBytesServer, cb func(io.Writer) error) (retErr error)
- func WriteFromStreamingBytesClient(streamingBytesClient StreamingBytesClient, writer io.Writer) error
- func WriteToStreamingBytesServer(reader io.Reader, server StreamingBytesServer) error
- type BufPool
- type ChunkWriteCloser
- type ClientStream
- type Interceptor
- type PachdAddress
- type ReaderWrapper
- type Server
- type StreamingBytesClient
- type StreamingBytesServer
Constants ¶
const ( // DefaultPachdNodePort is the pachd kubernetes service's default // NodePort.Port setting DefaultPachdNodePort = 30650 )
Variables ¶
var ( // ErrNoPachdAddress is returned by ParsePachdAddress when the input is an // empty string ErrNoPachdAddress = errors.New("no pachd address specified") // DefaultPachdAddress is the default PachdAddress that should be used // if none is otherwise specified. It's a loopback that should rely on // port forwarding. DefaultPachdAddress = PachdAddress{ Secured: false, Host: "0.0.0.0", Port: DefaultPachdNodePort, } )
var ( // MaxMsgSize is used to define the GRPC frame size. MaxMsgSize = 20 * units.MiB // MaxMsgPayloadSize is the max message payload size. // This is slightly less than MaxMsgSize to account // for the GRPC message wrapping the payload. MaxMsgPayloadSize = MaxMsgSize - units.MiB )
Functions ¶
func Chunk ¶
Chunk splits a piece of data up, this is useful for splitting up data that's bigger than MaxMsgPayloadSize.
func ChunkReader ¶
ChunkReader splits a reader into reasonably sized chunks for the purpose of transmitting the chunks over gRPC. For each chunk, it calls the given function.
func Collect ¶
func Collect[T proto.Message](cs ClientStream[T], max int) (ret []T, _ error)
Collect reads at most max elements from cs, and returns them as a slice.
func ForEach ¶
func ForEach[T proto.Message](cs ClientStream[T], fn func(x T) error) error
ForEach calls fn for each element in cs. fn must not retain the element passed to it.
func GetBuffer ¶
func GetBuffer() []byte
GetBuffer returns a buffer. The buffer may or may not be freshly allocated, and it may or may not be zero-ed.
func NewStreamingBytesReader ¶
func NewStreamingBytesReader(streamingBytesClient StreamingBytesClient, cancel context.CancelFunc) io.ReadCloser
NewStreamingBytesReader returns an io.Reader for a StreamingBytesClient.
func NewTestClient ¶
func Read ¶
func Read[T proto.Message](cs ClientStream[T], buf []T) (int, error)
Read fills buf with received messages from cs and returns the number read.
func ScrubGRPC ¶
ScrubGRPC removes GRPC error code information from 'err' if it came from GRPC (and returns it unchanged otherwise)
func WithStreamingBytesWriter ¶
func WithStreamingBytesWriter(streamingBytesServer StreamingBytesServer, cb func(io.Writer) error) (retErr error)
WithStreamingBytesWriter sets up a scoped streaming bytes writer that buffers and chunks writes. TODO: This should probably use a buffer pool eventually.
func WriteFromStreamingBytesClient ¶
func WriteFromStreamingBytesClient(streamingBytesClient StreamingBytesClient, writer io.Writer) error
WriteFromStreamingBytesClient writes from the StreamingBytesClient to the io.Writer.
func WriteToStreamingBytesServer ¶
func WriteToStreamingBytesServer(reader io.Reader, server StreamingBytesServer) error
TODO: Unused. Remove? WriteToStreamingBytesServer writes the data from the io.Reader to the StreamingBytesServer.
Types ¶
type BufPool ¶
BufPool is a wrapper around sync.Pool that makes it a little nicer to use for []byte by doing the casting for you and defining the `New` function.
func NewBufPool ¶
NewBufPool creates a new BufPool that returns buffers of the given size.
type ChunkWriteCloser ¶
type ChunkWriteCloser struct {
// contains filtered or unexported fields
}
ChunkWriteCloser is a utility for buffering writes into buffers obtained from a buffer pool. The ChunkWriteCloser will buffer up to the capacity of a buffer obtained from a buffer pool, then execute a callback that will receive the buffered data. The ChunkWriteCloser will get a new buffer from the pool for subsequent writes, so it is expected that the callback will return the buffer to the pool.
func NewChunkWriteCloser ¶
func NewChunkWriteCloser(bufPool *BufPool, f func(chunk []byte) error) *ChunkWriteCloser
NewChunkWriteCloser creates a new ChunkWriteCloser.
type ClientStream ¶
type ClientStream[T proto.Message] interface { Recv() (T, error) grpc.ClientStream }
type Interceptor ¶
type Interceptor struct { UnaryServerInterceptor grpc.UnaryServerInterceptor StreamServerInterceptor grpc.StreamServerInterceptor }
Interceptor can be used to configure Unary and Stream interceptors
type PachdAddress ¶
type PachdAddress struct { // Secured specifies whether grpcs should be used Secured bool // Host specifies the pachd address host without the port Host string // Port specifies the pachd port Port uint16 // UnixSocket is set if the pachd address refers to a unix socket UnixSocket string }
PachdAddress represents a parsed pachd address value
func ParsePachdAddress ¶
func ParsePachdAddress(value string) (*PachdAddress, error)
ParsePachdAddress parses a string into a pachd address, or returns an error if it's invalid
func (*PachdAddress) Qualified ¶
func (p *PachdAddress) Qualified() string
Qualified returns the "fully qualified" address, including the scheme
func (*PachdAddress) Target ¶
func (p *PachdAddress) Target() string
Target returns a string suitable for calling grpc.Dial. This may be a host:port pair for TCP connections, or a unix socket address.
type ReaderWrapper ¶
ReaderWrapper wraps a reader for the following reason: Go's io.CopyBuffer has an annoying optimization wherein if the reader has the WriteTo function defined, it doesn't actually use the given buffer. As a result, we might write a large chunk to the gRPC streaming server even though we intend to use a small buffer. Therefore we wrap readers in this wrapper so that only Read is defined.
type Server ¶
Server is a convenience wrapper to gRPC servers that simplifies their setup and execution
func NewServer ¶
func NewServer(ctx context.Context, publicPortTLSAllowed bool, options ...grpc.ServerOption) (*Server, error)
NewServer creates a new gRPC server, but does not start serving yet.
If 'publicPortTLSAllowed' is set, grpcutil may enable TLS. This should be set for public ports that serve GRPC services to 3rd party clients. If set, the criterion for actually serving over TLS is: if a signed TLS cert and corresponding private key in 'TLSVolumePath', this will serve GRPC traffic over TLS. If either are missing this will serve GRPC traffic over unencrypted HTTP,
type StreamingBytesClient ¶
type StreamingBytesClient interface {
Recv() (*wrapperspb.BytesValue, error)
}
StreamingBytesClient represents a client for an rpc method of the form:
rpc Foo(Bar) returns (stream google.protobuf.BytesValue) {}
type StreamingBytesServer ¶
type StreamingBytesServer interface {
Send(bytesValue *wrapperspb.BytesValue) error
}
StreamingBytesServer represents a server for an rpc method of the form:
rpc Foo(Bar) returns (stream google.protobuf.BytesValue) {}