Documentation
¶
Overview ¶
Package channel provides fleetspeak.Message passing over interprocess pipes.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // MagicTimeout is how long we are willing to wait for a magic // number. Public to support testing. Should only be changed when no // channels are active. MagicTimeout = 5 * time.Minute // MessageTimeout is how long we are willing to wait for a message // body. Public to support testing. Should only be changed when no // channels are active. MessageTimeout = 5 * time.Minute )
Functions ¶
This section is empty.
Types ¶
type Builder ¶
type Builder func() (c *Channel, cancel func())
Builder must return a new Channel connected to the target process, along with a cancel function that should shut down the Channel and release any associated resources.
May return (nil, nil) if the system is shutting down and the RelentlessChannel using the builder should stop. Otherwise, should only return once it has a Channel.
type Channel ¶
type Channel struct { In <-chan *fspb.Message // Messages received from the other process. Will be closed when the underlying pipe is closed. Out chan<- *fspb.Message // Messages to send to the other process. Close to shutdown the Channel. Err <-chan error // Any errors encountered. // contains filtered or unexported fields }
Channel handles the communication of fspb.Messages over interprocess pipes.
NOTE: once any error occurs, the channel may be only partially functional. In that case, the channel should be shutdown and recreated.
In particular, once an error is written to Err, the user of Channel is responsible for ensuring that any current operations against the provided io.Reader and io.Writer interfaces will unblock and terminate.
func New ¶
func New(pr io.ReadCloser, pw io.WriteCloser) *Channel
New instantiates a Channel. pr and pw will be closed when the Channel is shutdown.
type RelentlessAcknowledger ¶
type RelentlessAcknowledger struct { In <-chan service.AckMessage // Wraps Channel.In. // contains filtered or unexported fields }
RelentlessAcknowledger partially wraps a Channel. It assumes that the other end of the Channel is attached to a RelentlessChannel and implements the acknowledgement protocol which RelentlessChannel expects.
Once a Channel is so wrapped, the caller should read from RelentlessAcknowledger.In instead of Channel.In. The resulting AckMessages should be acknowledged in order to inform the attached RelentlessChannel that the message was successfully handled.
func NewRelentlessAcknowledger ¶
func NewRelentlessAcknowledger(c *Channel, size int) *RelentlessAcknowledger
NewRelentlessAcknowledger creates a RelentlessAcknowledger wrapping c, buffered to smoothly handle 'size' simultaneously unacknowledged messages.
func (*RelentlessAcknowledger) Stop ¶
func (a *RelentlessAcknowledger) Stop()
Stop stops the RelentlessAcknowledger and closes a.In.
type RelentlessChannel ¶
type RelentlessChannel struct { In <-chan *fspb.Message // Messages received from the other process. Out chan<- service.AckMessage // Messages to send to the other process. Close to shutdown. // contains filtered or unexported fields }
A RelentlessChannel is like a Channel, but relentless. Essentially it wraps a Channel, which it recreates on error. Furthermore, it maintains a collection of messages which have not been acknowledged, and resends them after channel recreation. It also provides a mechanism for messages sent through it to be acknowledged by the other side of the channel.
func NewRelentlessChannel ¶
func NewRelentlessChannel(b Builder) *RelentlessChannel
NewRelentlessChannel returns a RelentlessChannel which wraps Builder, and uses it to create channels.