reqresp

package
v0.2.4-0...-f8342f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SuccessCode    ResponseCode = 0
	InvalidReqCode              = 1
	ServerErrCode               = 2
)
View Source
const MAX_ERR_SIZE = 256

256 bytes max error size

Variables

This section is empty.

Functions

func EncodeChunk

func EncodeChunk(result ResponseCode, r io.Reader, w io.Writer, comp Compression) error

EncodeChunk reads (decompressed) response message from the msg io.Reader, and writes it as a chunk with given result code to the output writer. The compression is optional and may be nil.

func EncodeHeaderAndPayload

func EncodeHeaderAndPayload(r io.Reader, w io.Writer, comp Compression) error

EncodeHeaderAndPayload reads a payload, buffers (and optionally compresses) the payload, then computes the header-data (varint of byte size). And then writes header and payload.

func EncodeResult

func EncodeResult(result ResponseCode, w io.Writer) error

EncodeResult writes the result code to the output writer.

func StreamChunk

func StreamChunk(result ResponseCode, size uint64, r io.Reader, w io.Writer, comp Compression) error

EncodeChunk reads (decompressed) response message from the msg io.Reader, and writes it as a chunk with given result code to the output writer. The compression is optional and may be nil.

func StreamHeaderAndPayload

func StreamHeaderAndPayload(size uint64, r io.Reader, w io.Writer, comp Compression) error

StreamHeaderAndPayload reads a payload and streams (and optionally compresses) it to the writer. To do so, it requires the (uncompressed) payload length to be known in advance.

Types

type BufLimitReader

type BufLimitReader struct {
	N       int  // max bytes remaining
	PerRead bool // Limit applies per read, i.e. it is not affected at the end of the read.
	// contains filtered or unexported fields
}

Reader implements buffering for an io.Reader object.

func NewBufLimitReader

func NewBufLimitReader(rd io.Reader, size int, limit int) *BufLimitReader

NewBufLimitReader returns a new Reader whose buffer has the specified size. The reader will return an error if Read crosses the limit.

func (*BufLimitReader) Read

func (b *BufLimitReader) Read(p []byte) (n int, err error)

Read reads data into p. It returns the number of bytes read into p. The bytes are taken from at most one Read on the underlying Reader, hence N may be less than len(p). At EOF, the count will be zero and err will be io.EOF.

func (*BufLimitReader) ReadByte

func (b *BufLimitReader) ReadByte() (byte, error)

type ChunkedRequestHandler

type ChunkedRequestHandler interface {
	RequestReader
	RequestResponder
}

type ChunkedResponseHandler

type ChunkedResponseHandler interface {
	ChunkSize() uint64
	ChunkIndex() uint64
	ResultCode() ResponseCode
	ReadRaw() ([]byte, error)
	ReadErrMsg() (string, error)
	ReadObj(dest codec.Deserializable) error
}

type Codec

type Codec interface {
	MinByteLen() uint64
	MaxByteLen() uint64
	Encode(w io.Writer, input codec.Serializable) error
	Decode(r io.Reader, bytesLen uint64, dest codec.Deserializable) error
	Alloc() SerDes
}

type Compression

type Compression interface {
	// Wraps a reader to decompress data as reads happen.
	Decompress(r io.Reader) io.Reader
	// Wraps a writer to compress data as writes happen.
	Compress(w io.WriteCloser) io.WriteCloser
	// Returns an error when the input size is too large to encode.
	MaxEncodedLen(msgLen uint64) (uint64, error)
	// The name of the compression that is suffixed to the actual encoding. E.g. "snappy", w.r.t. "ssz_snappy".
	Name() string
}

type NewStreamFn

type NewStreamFn func(ctx context.Context, peerId peer.ID, protocolId ...protocol.ID) (network.Stream, error)

func (NewStreamFn) Request

func (newStreamFn NewStreamFn) Request(ctx context.Context, peerId peer.ID, protocolId protocol.ID, r io.Reader, comp Compression, handle ResponseHandler) error

type OnRequestListener

type OnRequestListener func(ctx context.Context, peerId peer.ID, handler ChunkedRequestHandler)

type OnRequested

type OnRequested func()

type OnResponseListener

type OnResponseListener func(chunk ChunkedResponseHandler) error

type RPCMethod

type RPCMethod struct {
	Protocol                  protocol.ID
	RequestCodec              Codec
	ResponseChunkCodec        Codec
	DefaultResponseChunkCount uint64
}

func (*RPCMethod) MakeStreamHandler

func (m *RPCMethod) MakeStreamHandler(newCtx StreamCtxFn, comp Compression, listener OnRequestListener) network.StreamHandler

func (*RPCMethod) RunRequest

func (m *RPCMethod) RunRequest(ctx context.Context, newStreamFn NewStreamFn,
	peerId peer.ID, comp Compression, req RequestInput, maxRespChunks uint64, madeRequest func() error,
	onResponse OnResponseListener) error

type ReadRequestFn

type ReadRequestFn func(dest interface{}) error

type Request

type Request interface {
	fmt.Stringer
}

type RequestBytesInput

type RequestBytesInput []byte

func (RequestBytesInput) Reader

func (v RequestBytesInput) Reader(_ Codec) (io.Reader, error)

type RequestInput

type RequestInput interface {
	Reader(c Codec) (io.Reader, error)
}

type RequestPayloadHandler

type RequestPayloadHandler func(ctx context.Context, peerId peer.ID, requestLen uint64, r io.Reader, w io.Writer, comp Compression, invalidInputErr error)

RequestPayloadHandler processes a request (decompressed if previously compressed), read from r. The handler can respond by writing to w. After returning the writer will automatically be closed. If the input is already known to be invalid, e.g. the request size is invalid, then `invalidInputErr != nil`, and r will not read anything more.

func (RequestPayloadHandler) MakeStreamHandler

func (handle RequestPayloadHandler) MakeStreamHandler(newCtx StreamCtxFn, comp Compression, minRequestContentSize, maxRequestContentSize uint64) network.StreamHandler

startReqRPC registers a request handler for the given protocol. Compression is optional and may be nil.

type RequestReader

type RequestReader interface {
	// nil if not an invalid input
	InvalidInput() error
	ReadRequest(dest codec.Deserializable) error
	RawRequest() ([]byte, error)
}

type RequestResponder

type RequestResponder interface {
	WriteResponseChunk(code ResponseCode, data codec.Serializable) error
	WriteRawResponseChunk(code ResponseCode, chunk []byte) error
	StreamResponseChunk(code ResponseCode, size uint64, r io.Reader) error
	WriteErrorChunk(code ResponseCode, msg string) error
}

type RequestSSZInput

type RequestSSZInput struct {
	Obj codec.Serializable
}

func (RequestSSZInput) Reader

func (v RequestSSZInput) Reader(c Codec) (io.Reader, error)

type ResponseChunkHandler

type ResponseChunkHandler func(ctx context.Context, chunkIndex uint64, chunkSize uint64, result ResponseCode, r io.Reader, w io.Writer) error

ResponseChunkHandler is a function that processes a response chunk. The index, size and result-code are already parsed. The contents (decompressed if previously compressed) can be read from r. Optionally an answer can be written back to w. If the response chunk could not be processed, an error may be returned.

func (ResponseChunkHandler) MakeResponseHandler

func (handleChunk ResponseChunkHandler) MakeResponseHandler(maxChunkCount uint64, maxChunkContentSize uint64, comp Compression) ResponseHandler

MakeResponseHandler builds a ResponseHandler, which won't take more than maxChunkCount chunks, or chunk contents larger than maxChunkContentSize. Compression is optional and may be nil. Chunks are processed by the given ResponseChunkHandler.

type ResponseCode

type ResponseCode uint8

type ResponseHandler

type ResponseHandler func(ctx context.Context, r io.Reader, w io.WriteCloser) error

ResponseHandler processes a response by internally processing chunks, any error is propagated up.

type SSZCodec

type SSZCodec struct {
	// contains filtered or unexported fields
}

func NewSSZCodec

func NewSSZCodec(alloc func() SerDes, minByteLen uint64, maxByteLen uint64) *SSZCodec

func (*SSZCodec) Alloc

func (c *SSZCodec) Alloc() SerDes

func (*SSZCodec) Decode

func (c *SSZCodec) Decode(r io.Reader, bytesLen uint64, dest codec.Deserializable) error

func (*SSZCodec) Encode

func (c *SSZCodec) Encode(w io.Writer, input codec.Serializable) error

func (*SSZCodec) MaxByteLen

func (c *SSZCodec) MaxByteLen() uint64

func (*SSZCodec) MinByteLen

func (c *SSZCodec) MinByteLen() uint64

type SerDes

type SerDes interface {
	codec.Serializable
	codec.Deserializable
}

type SnappyCompression

type SnappyCompression struct{}

func (SnappyCompression) Compress

func (SnappyCompression) Decompress

func (c SnappyCompression) Decompress(reader io.Reader) io.Reader

func (SnappyCompression) MaxEncodedLen

func (c SnappyCompression) MaxEncodedLen(msgLen uint64) (uint64, error)

func (SnappyCompression) Name

func (c SnappyCompression) Name() string

type StreamCtxFn

type StreamCtxFn func() context.Context

type WriteMsgFn

type WriteMsgFn func(msg string) error

type WriteSuccessChunkFn

type WriteSuccessChunkFn func(data interface{}) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL