drpcstream

package
v0.0.33 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2023 License: MIT Imports: 13 Imported by: 2

README

package drpcstream

import "storj.io/drpc/drpcstream"

Package drpcstream sends protobufs using the dprc wire protocol.

Stream state machine diagram

Usage

type Options
type Options struct {
	// SplitSize controls the default size we split packets into frames.
	SplitSize int

	// ManualFlush controls if the stream will automatically flush after every
	// message send. Note that flushing is not part of the drpc.Stream
	// interface, so if you use this you must be ready to type assert and
	// call RawFlush dynamically.
	ManualFlush bool

	// MaximumBufferSize causes the Stream to drop any internal buffers that
	// are larger than this amount to control maximum memory usage at the
	// expense of more allocations. 0 is unlimited.
	MaximumBufferSize int

	// Internal contains options that are for internal use only.
	Internal drpcopts.Stream
}

Options controls configuration settings for a stream.

type Stream
type Stream struct {
}

Stream represents an rpc actively happening on a transport.

func New
func New(ctx context.Context, sid uint64, wr *drpcwire.Writer) *Stream

New returns a new stream bound to the context with the given stream id and will use the writer to write messages on. It is important use monotonically increasing stream ids within a single transport.

func NewWithOptions
func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts Options) *Stream

NewWithOptions returns a new stream bound to the context with the given stream id and will use the writer to write messages on. It is important use monotonically increasing stream ids within a single transport. The options are used to control details of how the Stream operates.

func (*Stream) Cancel
func (s *Stream) Cancel(err error) bool

Cancel transitions the stream into a state where all writes to the transport will return the provided error, and terminates the stream. It is a no-op if the stream is already finished, and returns a boolean indicating if that was the case.

func (*Stream) Close
func (s *Stream) Close() (err error)

Close terminates the stream and sends that the stream has been closed to the remote. It is a no-op if the stream is already terminated.

func (*Stream) CloseSend
func (s *Stream) CloseSend() (err error)

CloseSend informs the remote that no more messages will be sent. If the remote has also already issued a CloseSend, the stream is terminated. It is a no-op if the stream already has sent a CloseSend or if it is terminated.

func (*Stream) Context
func (s *Stream) Context() context.Context

Context returns the context associated with the stream. It is closed when the Stream will no longer issue any writes or reads.

func (*Stream) Finished
func (s *Stream) Finished() <-chan struct{}

Finished returns a channel that is closed when the stream is fully finished and will no longer issue any writes or reads.

func (*Stream) HandlePacket
func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error)

HandlePacket advances the stream state machine by inspecting the packet. It returns any major errors that should terminate the transport the stream is operating on as well as a boolean indicating if the stream expects more packets.

func (*Stream) ID
func (s *Stream) ID() uint64

ID returns the stream id.

func (*Stream) IsFinished
func (s *Stream) IsFinished() bool

IsFinished returns true if the stream is fully finished and will no longer issue any writes or reads.

func (*Stream) IsTerminated
func (s *Stream) IsTerminated() bool

IsTerminated returns true if the stream has been terminated.

func (*Stream) MsgRecv
func (s *Stream) MsgRecv(msg drpc.Message, enc drpc.Encoding) (err error)

MsgRecv recives some message data and unmarshals it with enc into msg.

func (*Stream) MsgSend
func (s *Stream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error)

MsgSend marshals the message with the encoding, writes it, and flushes.

func (*Stream) RawFlush
func (s *Stream) RawFlush() (err error)

RawFlush flushes any buffers of data.

func (*Stream) RawRecv
func (s *Stream) RawRecv() (data []byte, err error)

RawRecv returns the raw bytes received for a message.

func (*Stream) RawWrite
func (s *Stream) RawWrite(kind drpcwire.Kind, data []byte) (err error)

RawWrite sends the data bytes with the given kind.

func (*Stream) SendCancel
func (s *Stream) SendCancel(err error) (bool, error)

SendCancel transitions the stream into the canceled state with context.Canceled and sends a cancel error to the remote side for a soft cancel. It is a no-op if the stream is already terminated. It returns true for busy if writes are already blocked and a hard cancel is required.

func (*Stream) SendError
func (s *Stream) SendError(serr error) (err error)

SendError terminates the stream and sends the error to the remote. It is a no-op if the stream is already terminated.

func (*Stream) SetManualFlush
func (s *Stream) SetManualFlush(mf bool)

SetManualFlush sets the ManualFlush option. It cannot be called concurrently with any sends or receives on the stream. Example use case:

flusher := stream.(interface{
    GetStream() drpc.Stream
}).GetStream().(interface{
    SetManualFlush(bool)
})

flusher.SetManualFlush(true)
err = stream.Send(&pb.Message{Request: "hello, "})
flusher.SetManualFlush(false)
if err != nil {
    return err
}

// the next send will send both messages in the same write
// to the underlying connection.
err = stream.Send(&pb.Message{Request: "world!"})
if err != nil {
    return err
}
func (*Stream) String
func (s *Stream) String() string

String returns a string representation of the stream.

func (*Stream) Terminated
func (s *Stream) Terminated() <-chan struct{}

Terminated returns a channel that is closed when the stream has been terminated.

Documentation

Overview

Package drpcstream sends protobufs using the dprc wire protocol.

![Stream state machine diagram](./state.png)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Options added in v0.0.7

type Options struct {
	// SplitSize controls the default size we split packets into frames.
	SplitSize int

	// ManualFlush controls if the stream will automatically flush after every
	// message send. Note that flushing is not part of the drpc.Stream
	// interface, so if you use this you must be ready to type assert and
	// call RawFlush dynamically.
	ManualFlush bool

	// MaximumBufferSize causes the Stream to drop any internal buffers that
	// are larger than this amount to control maximum memory usage at the
	// expense of more allocations. 0 is unlimited.
	MaximumBufferSize int

	// Internal contains options that are for internal use only.
	Internal drpcopts.Stream
}

Options controls configuration settings for a stream.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents an rpc actively happening on a transport.

func New

func New(ctx context.Context, sid uint64, wr *drpcwire.Writer) *Stream

New returns a new stream bound to the context with the given stream id and will use the writer to write messages on. It is important use monotonically increasing stream ids within a single transport.

func NewWithOptions added in v0.0.7

func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts Options) *Stream

NewWithOptions returns a new stream bound to the context with the given stream id and will use the writer to write messages on. It is important use monotonically increasing stream ids within a single transport. The options are used to control details of how the Stream operates.

func (*Stream) Cancel added in v0.0.7

func (s *Stream) Cancel(err error) bool

Cancel transitions the stream into a state where all writes to the transport will return the provided error, and terminates the stream. It is a no-op if the stream is already finished, and returns a boolean indicating if that was the case.

func (*Stream) Close

func (s *Stream) Close() (err error)

Close terminates the stream and sends that the stream has been closed to the remote. It is a no-op if the stream is already terminated.

func (*Stream) CloseSend

func (s *Stream) CloseSend() (err error)

CloseSend informs the remote that no more messages will be sent. If the remote has also already issued a CloseSend, the stream is terminated. It is a no-op if the stream already has sent a CloseSend or if it is terminated.

func (*Stream) Context

func (s *Stream) Context() context.Context

Context returns the context associated with the stream. It is closed when the Stream will no longer issue any writes or reads.

func (*Stream) Finished added in v0.0.7

func (s *Stream) Finished() <-chan struct{}

Finished returns a channel that is closed when the stream is fully finished and will no longer issue any writes or reads.

func (*Stream) HandlePacket added in v0.0.5

func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error)

HandlePacket advances the stream state machine by inspecting the packet. It returns any major errors that should terminate the transport the stream is operating on as well as a boolean indicating if the stream expects more packets.

func (*Stream) ID

func (s *Stream) ID() uint64

ID returns the stream id.

func (*Stream) IsFinished added in v0.0.16

func (s *Stream) IsFinished() bool

IsFinished returns true if the stream is fully finished and will no longer issue any writes or reads.

func (*Stream) IsTerminated added in v0.0.24

func (s *Stream) IsTerminated() bool

IsTerminated returns true if the stream has been terminated.

func (*Stream) MsgRecv

func (s *Stream) MsgRecv(msg drpc.Message, enc drpc.Encoding) (err error)

MsgRecv recives some message data and unmarshals it with enc into msg.

func (*Stream) MsgSend

func (s *Stream) MsgSend(msg drpc.Message, enc drpc.Encoding) (err error)

MsgSend marshals the message with the encoding, writes it, and flushes.

func (*Stream) RawFlush

func (s *Stream) RawFlush() (err error)

RawFlush flushes any buffers of data.

func (*Stream) RawRecv

func (s *Stream) RawRecv() (data []byte, err error)

RawRecv returns the raw bytes received for a message.

func (*Stream) RawWrite

func (s *Stream) RawWrite(kind drpcwire.Kind, data []byte) (err error)

RawWrite sends the data bytes with the given kind.

func (*Stream) SendCancel

func (s *Stream) SendCancel(err error) (bool, error)

SendCancel transitions the stream into the canceled state with context.Canceled and sends a cancel error to the remote side for a soft cancel. It is a no-op if the stream is already terminated. It returns true for busy if writes are already blocked and a hard cancel is required.

func (*Stream) SendError

func (s *Stream) SendError(serr error) (err error)

SendError terminates the stream and sends the error to the remote. It is a no-op if the stream is already terminated.

func (*Stream) SetManualFlush added in v0.0.33

func (s *Stream) SetManualFlush(mf bool)

SetManualFlush sets the ManualFlush option. It cannot be called concurrently with any sends or receives on the stream. Example use case:

flusher := stream.(interface{
    GetStream() drpc.Stream
}).GetStream().(interface{
    SetManualFlush(bool)
})

flusher.SetManualFlush(true)
err = stream.Send(&pb.Message{Request: "hello, "})
flusher.SetManualFlush(false)
if err != nil {
    return err
}

// the next send will send both messages in the same write
// to the underlying connection.
err = stream.Send(&pb.Message{Request: "world!"})
if err != nil {
    return err
}

func (*Stream) String added in v0.0.24

func (s *Stream) String() string

String returns a string representation of the stream.

func (*Stream) Terminated added in v0.0.5

func (s *Stream) Terminated() <-chan struct{}

Terminated returns a channel that is closed when the stream has been terminated.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL