Documentation ¶
Overview ¶
Package envelope is a message oriented API to build envelopes
Index ¶
- Variables
- func Complete(e Envelope, timeout time.Duration) (*api.Envelope, error)
- func CompleteWithContext(ctx context.Context, e Envelope) (*api.Envelope, error)
- func ForEachChunk(msg Message, fn func([]byte) error) error
- func ForEachChunkString(msg Message, fn func(string) error) error
- func ForEachMessage(env Envelope, fn func(Message) error) error
- func MustNewMessageChunkWriter(msgType string) (Message, ChunkWriter)
- func NewChunkReaderReader(chunkReader ChunkReader) io.Reader
- func NewMessageChunkWriter(msgType string) (Message, ChunkWriter, error)
- func NewMessageChunkWriterWithID(id api.UUID, msgType string, checksum api.Checksum) (Message, ChunkWriter)
- func NewStreamMessageWriter(addChunks func([][]byte) error) io.WriteCloser
- func ReadAll(m Message) ([]byte, error)
- func ReadAllAsString(m Message) (string, error)
- func ReadAllChunks(reader ChunkReader) ([][]byte, error)
- type Builder
- func (e *Builder) Add(m Message) error
- func (e *Builder) Clone() (Envelope, error)
- func (e *Builder) Close() (err error)
- func (e *Builder) CloseWithError(err error) (outErr error)
- func (e *Builder) ID() api.UUID
- func (e *Builder) NextMessage(wait bool) (Message, error)
- func (e *Builder) Opened() (opened bool)
- type BuilderFromFragment
- type ChunkReader
- type ChunkReaderReader
- type ChunkWriter
- type Envelope
- type Message
- func MustNewBytesMessage(msgType string, content []byte) Message
- func MustNewTextMessage(msgType string, content string) Message
- func NewBytesMessage(msgType string, content []byte) (Message, error)
- func NewBytesMessageWithID(id api.UUID, msgType string, content []byte) (Message, error)
- func NewMessageWriter(msgType string) (Message, io.WriteCloser, error)
- func NewMessageWriterWithID(id api.UUID, msgType string) (Message, io.WriteCloser)
- func NewTextMessage(msgType string, content string) (Message, error)
- func NewTextMessageWithID(id api.UUID, msgType string, content string) (Message, error)
- type MessageCloser
- type SimpleMessage
- type StreamMessage
- func (m *StreamMessage) Checksum() (c api.Checksum)
- func (m *StreamMessage) ChunkReader() ChunkReader
- func (m *StreamMessage) Clone() Message
- func (m *StreamMessage) CloseWithError(err error) error
- func (m *StreamMessage) ID() api.UUID
- func (m *StreamMessage) MsgType() string
- func (m *StreamMessage) Reader() io.Reader
- func (m *StreamMessage) SetChecksum(c api.Checksum)
Constants ¶
This section is empty.
Variables ¶
var ( // MaxChunkSize is the maximum allowed size for a chunk. If bigger, any // api taking a chunk will refuse it MaxChunkSize = 1024 * 768 // ChunkSplitThreadhold is the chunk size above which a paylod will be // splitted in chunks ChunkSplitThreadhold = 1024 * 250 // DefaultChunkSize is the chunk size when a content is automatically // splitted DefaultChunkSize = 1024 * 50 // ErrCheckum is returned by addChunk if a checksum error is detected ErrCheckum = errors.New("Checksum error") // ErrClosedEnvelope is returned by addMessage if the envelope is // already closed ErrClosedEnvelope = errors.New("Envelope is closed") // ErrClosedMessage is returned by Message readers/writers if the message is // already closed ErrClosedMessage = errors.New("Message is closed") )
var ErrTimeout = errors.New("Timeout")
ErrTimeout ...
Functions ¶
func CompleteWithContext ¶
CompleteWithContext accumulate the complete envelope as a api.Envelope
func ForEachChunk ¶
ForEachChunk calls a function for each chunk of a message
func ForEachChunkString ¶
ForEachChunkString calls a function for each chunk of a message
func ForEachMessage ¶
ForEachMessage call a function for all messages of an envelope.
func MustNewMessageChunkWriter ¶
func MustNewMessageChunkWriter(msgType string) (Message, ChunkWriter)
MustNewMessageChunkWriter returns a Message and a chunk writer to write its content. It panics if a UUID cannot be generated The check writer gives controls on the chunk sizes, and will refuse chunks that are too big The Close method must be called to end the stream
func NewChunkReaderReader ¶
func NewChunkReaderReader(chunkReader ChunkReader) io.Reader
NewChunkReaderReader returns a io.Reader that consumes a ChunkReader
func NewMessageChunkWriter ¶
func NewMessageChunkWriter(msgType string) (Message, ChunkWriter, error)
NewMessageChunkWriter returns a Message and a chunk writer to write its content. The check writer gives controls on the chunk sizes, and will refuse chunks that are too big The Close method must be called to end the stream
func NewMessageChunkWriterWithID ¶
func NewMessageChunkWriterWithID( id api.UUID, msgType string, checksum api.Checksum, ) (Message, ChunkWriter)
NewMessageChunkWriterWithID returns a Message and a chunk writer to write its content. If checksum is non-zero, it will be checked when the chunk writer is closed. The check writer gives controls on the chunk sizes, and will refuse chunks that are too big The Close method must be called to end the stream
func NewStreamMessageWriter ¶
func NewStreamMessageWriter(addChunks func([][]byte) error) io.WriteCloser
NewStreamMessageWriter implements io.WriteCloser and auto-split in chunks
func ReadAllAsString ¶
ReadAllAsString returns a message full content as a string
func ReadAllChunks ¶
func ReadAllChunks(reader ChunkReader) ([][]byte, error)
ReadAllChunks returns all the chunks of a ChunkReader
Types ¶
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder is the default Envelope implementation
func MustNewOpenedEnvelope ¶
func MustNewOpenedEnvelope() *Builder
MustNewOpenedEnvelope creates an envelope on which messages can be added
func NewOpenedEnvelope ¶
NewOpenedEnvelope creates an envelope on which messages can be added
func NewOpenedEnvelopeWithID ¶
NewOpenedEnvelopeWithID creates an envelope on which messages can be added
func (*Builder) CloseWithError ¶
CloseWithError closes the envelope with an error. If the error is not ErrClosedEnvelope then all the messages are set in error state
func (*Builder) NextMessage ¶
NextMessage returns the next available message
type BuilderFromFragment ¶
type BuilderFromFragment struct { Complete bool // contains filtered or unexported fields }
BuilderFromFragment builds an Envelope from a serie of api.Envelope fragments.
func NewFromFragments ¶
func NewFromFragments(envelopeID api.UUID) *BuilderFromFragment
NewFromFragments returns a Envelope that can be fed with a stream of envelope fragments. After all fragments are received, the functions returns io.EOF
func (*BuilderFromFragment) Add ¶
func (eb *BuilderFromFragment) Add(fragment *api.Envelope) error
Add adds a fragment
func (*BuilderFromFragment) Cancel ¶
func (eb *BuilderFromFragment) Cancel(err error) error
Cancel stops builder and set an error on the envelope
func (*BuilderFromFragment) Done ¶
func (eb *BuilderFromFragment) Done() <-chan struct{}
Done returns a channel that will be closed with the reception of fragments is complete
func (*BuilderFromFragment) Envelope ¶
func (eb *BuilderFromFragment) Envelope() Envelope
Envelope returns the envelope being built
type ChunkReader ¶
type ChunkReader interface { ReadString() (string, error) ReadBytes() ([]byte, error) DataAvailable() bool }
ChunkReader allow reading a messages by chunks as they were written by a ChunkWriter
type ChunkReaderReader ¶
type ChunkReaderReader struct {
// contains filtered or unexported fields
}
ChunkReaderReader implements io.Reader on top of ChunkReader
type ChunkWriter ¶
ChunkWriter writes chunks of message. The chunk sizes are kept and the exact same chunks can be read using a ChunkReader when reading the message. A single chunk should be small, and if too big will make the resulting message impossible to send on the bus. The recommanded maximum size is approximatively 768K.
func NewStreamMessageChunkWriter ¶
func NewStreamMessageChunkWriter(addChunks func([][]byte) error) ChunkWriter
NewStreamMessageChunkWriter implements ChunkWriter and output a chunk for reach call to Write
type Envelope ¶
type Envelope interface { ID() api.UUID // Returns the next message if any. It is a blocking call if 'wait' is true, // and all calls after the last message returns a 'EOF' error NextMessage(wait bool) (Message, error) // Clone the envelope and all the message in it. // The clone can be read without altering the original envelope read cursors. // Clone must be called before any call to "NextMessage" // Warning: using clone on big streamed envelopes can use a LOT of memory Clone() (Envelope, error) // Close the reading of the envelope and set a custom error. // The error cannot be ErrClosedEnvelope. Any subsequent // read or write in the envelope will fail with the error that was passed. CloseWithError(error) error }
Envelope holds messages. The messages can be written and read on the fly, which makes the envelope more like a buffer holder than a message holder. It also means the envelope can be read only once, unless cloned first
func MustNewEnvelope ¶
MustNewEnvelope creates an envelope and panics if the UUID creation fails, which should be virtually never
func NewEnvelope ¶
NewEnvelope creates an envelope. The only possible error is when it fails to generate a UUID (which is unlikely), so you may consider MustNewEnvelope.
type Message ¶
type Message interface { ID() api.UUID MsgType() string Checksum() api.Checksum // Clone the message. Cannot be called if any content access method was // already called Clone() Message Reader() io.Reader ChunkReader() ChunkReader }
Message is typed and holds some data
func MustNewBytesMessage ¶
MustNewBytesMessage creates a message and panics if a message UUID cannot be generated, which should be virtually never
func MustNewTextMessage ¶
MustNewTextMessage creates a message and panics if a message UUID cannot be generated, which should be virtually never
func NewBytesMessage ¶
NewBytesMessage creates a message with a simple []byte content The content, if too big, may be splitted in smaller chunks
func NewBytesMessageWithID ¶
NewBytesMessageWithID creates a message with a simple []byte content The content, if too big, may be splitted in smaller chunks
func NewMessageWriter ¶
func NewMessageWriter(msgType string) (Message, io.WriteCloser, error)
NewMessageWriter returns a Message and a writer to write its content The Close method must be called to end the stream
func NewMessageWriterWithID ¶
NewMessageWriterWithID returns a Message and a writer to write its content The Close method must be called to end the stream
func NewTextMessage ¶
NewTextMessage creates a message with a simple string content The content, if too big, may be splitted in smaller chunks
type MessageCloser ¶
MessageCloser is a Message that should be closed.
type SimpleMessage ¶
type SimpleMessage struct {
// contains filtered or unexported fields
}
SimpleMessage is a non-streamed implementation of Message
func (*SimpleMessage) Checksum ¶
func (m *SimpleMessage) Checksum() api.Checksum
Checksum returns the message checksum
func (*SimpleMessage) ChunkReader ¶
func (m *SimpleMessage) ChunkReader() ChunkReader
ChunkReader returns a ChunkReader on the content
func (*SimpleMessage) MsgType ¶
func (m *SimpleMessage) MsgType() string
MsgType returns the message type
func (*SimpleMessage) Reader ¶
func (m *SimpleMessage) Reader() io.Reader
Reader returns a io.Reader on the content
type StreamMessage ¶
type StreamMessage struct {
// contains filtered or unexported fields
}
StreamMessage is a streamed message
func NewStreamMessage ¶
func NewStreamMessage(id api.UUID, msgType string) *StreamMessage
NewStreamMessage creates a StreamMessage
func (*StreamMessage) Checksum ¶
func (m *StreamMessage) Checksum() (c api.Checksum)
Checksum returns the message checksum if known, 0 otherwise
func (*StreamMessage) ChunkReader ¶
func (m *StreamMessage) ChunkReader() ChunkReader
ChunkReader returns a ChunkReader for the message
func (*StreamMessage) Clone ¶
func (m *StreamMessage) Clone() Message
Clone the message. Cannot be called if any content access method was already called
func (*StreamMessage) CloseWithError ¶
func (m *StreamMessage) CloseWithError(err error) error
CloseWithError set a the given error on the message, which will block any subsequent read/write
func (*StreamMessage) MsgType ¶
func (m *StreamMessage) MsgType() string
MsgType returns the message type
func (*StreamMessage) Reader ¶
func (m *StreamMessage) Reader() io.Reader
Reader returns a io.Reader for the message
func (*StreamMessage) SetChecksum ¶
func (m *StreamMessage) SetChecksum(c api.Checksum)
SetChecksum sets the checksum. If called, the calculated checksum will be compared to it on closing