envelope

package
v0.0.0-...-05a97e5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2020 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package envelope is a message oriented API to build envelopes

Index

Constants

This section is empty.

Variables

View Source
var (
	// MaxChunkSize is the maximum allowed size for a chunk. If bigger, any
	// api taking a chunk will refuse it
	MaxChunkSize = 1024 * 768

	// ChunkSplitThreadhold is the chunk size above which a paylod will be
	// splitted in chunks
	ChunkSplitThreadhold = 1024 * 250

	// DefaultChunkSize is the chunk size when a content is automatically
	// splitted
	DefaultChunkSize = 1024 * 50

	// ErrCheckum is returned by addChunk if a checksum error is detected
	ErrCheckum = errors.New("Checksum error")

	// ErrClosedEnvelope is returned by addMessage if the envelope is
	// already closed
	ErrClosedEnvelope = errors.New("Envelope is closed")

	// ErrClosedMessage is returned by Message readers/writers if the message is
	// already closed
	ErrClosedMessage = errors.New("Message is closed")
)
View Source
var ErrTimeout = errors.New("Timeout")

ErrTimeout ...

Functions

func Complete

func Complete(e Envelope, timeout time.Duration) (*api.Envelope, error)

Complete accumulate the complete envelope as a api.Envelope

func CompleteWithContext

func CompleteWithContext(ctx context.Context, e Envelope) (*api.Envelope, error)

CompleteWithContext accumulate the complete envelope as a api.Envelope

func ForEachChunk

func ForEachChunk(msg Message, fn func([]byte) error) error

ForEachChunk calls a function for each chunk of a message

func ForEachChunkString

func ForEachChunkString(msg Message, fn func(string) error) error

ForEachChunkString calls a function for each chunk of a message

func ForEachMessage

func ForEachMessage(env Envelope, fn func(Message) error) error

ForEachMessage call a function for all messages of an envelope.

func MustNewMessageChunkWriter

func MustNewMessageChunkWriter(msgType string) (Message, ChunkWriter)

MustNewMessageChunkWriter returns a Message and a chunk writer to write its content. It panics if a UUID cannot be generated The check writer gives controls on the chunk sizes, and will refuse chunks that are too big The Close method must be called to end the stream

func NewChunkReaderReader

func NewChunkReaderReader(chunkReader ChunkReader) io.Reader

NewChunkReaderReader returns a io.Reader that consumes a ChunkReader

func NewMessageChunkWriter

func NewMessageChunkWriter(msgType string) (Message, ChunkWriter, error)

NewMessageChunkWriter returns a Message and a chunk writer to write its content. The check writer gives controls on the chunk sizes, and will refuse chunks that are too big The Close method must be called to end the stream

func NewMessageChunkWriterWithID

func NewMessageChunkWriterWithID(
	id api.UUID, msgType string, checksum api.Checksum,
) (Message, ChunkWriter)

NewMessageChunkWriterWithID returns a Message and a chunk writer to write its content. If checksum is non-zero, it will be checked when the chunk writer is closed. The check writer gives controls on the chunk sizes, and will refuse chunks that are too big The Close method must be called to end the stream

func NewStreamMessageWriter

func NewStreamMessageWriter(addChunks func([][]byte) error) io.WriteCloser

NewStreamMessageWriter implements io.WriteCloser and auto-split in chunks

func ReadAll

func ReadAll(m Message) ([]byte, error)

ReadAll returns a message full content as a []byte

func ReadAllAsString

func ReadAllAsString(m Message) (string, error)

ReadAllAsString returns a message full content as a string

func ReadAllChunks

func ReadAllChunks(reader ChunkReader) ([][]byte, error)

ReadAllChunks returns all the chunks of a ChunkReader

Types

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder is the default Envelope implementation

func MustNewOpenedEnvelope

func MustNewOpenedEnvelope() *Builder

MustNewOpenedEnvelope creates an envelope on which messages can be added

func NewBuilder

func NewBuilder(id api.UUID) *Builder

NewBuilder creates a new Builder

func NewOpenedEnvelope

func NewOpenedEnvelope() (*Builder, error)

NewOpenedEnvelope creates an envelope on which messages can be added

func NewOpenedEnvelopeWithID

func NewOpenedEnvelopeWithID(id api.UUID) *Builder

NewOpenedEnvelopeWithID creates an envelope on which messages can be added

func (*Builder) Add

func (e *Builder) Add(m Message) error

Add adds a message to the envelope. The envelope must be opened.

func (*Builder) Clone

func (e *Builder) Clone() (Envelope, error)

Clone the builder. Must be closed to make any sens (for now)

func (*Builder) Close

func (e *Builder) Close() (err error)

Close closes properly the envelope

func (*Builder) CloseWithError

func (e *Builder) CloseWithError(err error) (outErr error)

CloseWithError closes the envelope with an error. If the error is not ErrClosedEnvelope then all the messages are set in error state

func (*Builder) ID

func (e *Builder) ID() api.UUID

ID returns the envelope ID

func (*Builder) NextMessage

func (e *Builder) NextMessage(wait bool) (Message, error)

NextMessage returns the next available message

func (*Builder) Opened

func (e *Builder) Opened() (opened bool)

Opened returns true if the builder stills accept new messages

type BuilderFromFragment

type BuilderFromFragment struct {
	Complete bool
	// contains filtered or unexported fields
}

BuilderFromFragment builds an Envelope from a serie of api.Envelope fragments.

func NewFromFragments

func NewFromFragments(envelopeID api.UUID) *BuilderFromFragment

NewFromFragments returns a Envelope that can be fed with a stream of envelope fragments. After all fragments are received, the functions returns io.EOF

func (*BuilderFromFragment) Add

func (eb *BuilderFromFragment) Add(fragment *api.Envelope) error

Add adds a fragment

func (*BuilderFromFragment) Cancel

func (eb *BuilderFromFragment) Cancel(err error) error

Cancel stops builder and set an error on the envelope

func (*BuilderFromFragment) Done

func (eb *BuilderFromFragment) Done() <-chan struct{}

Done returns a channel that will be closed with the reception of fragments is complete

func (*BuilderFromFragment) Envelope

func (eb *BuilderFromFragment) Envelope() Envelope

Envelope returns the envelope being built

type ChunkReader

type ChunkReader interface {
	ReadString() (string, error)
	ReadBytes() ([]byte, error)
	DataAvailable() bool
}

ChunkReader allow reading a messages by chunks as they were written by a ChunkWriter

type ChunkReaderReader

type ChunkReaderReader struct {
	// contains filtered or unexported fields
}

ChunkReaderReader implements io.Reader on top of ChunkReader

func (*ChunkReaderReader) Read

func (r *ChunkReaderReader) Read(buf []byte) (int, error)

Read implements the io.Reader interface

type ChunkWriter

type ChunkWriter interface {
	io.Closer
	Write([]byte) error
	WriteString(string) error
}

ChunkWriter writes chunks of message. The chunk sizes are kept and the exact same chunks can be read using a ChunkReader when reading the message. A single chunk should be small, and if too big will make the resulting message impossible to send on the bus. The recommanded maximum size is approximatively 768K.

func NewStreamMessageChunkWriter

func NewStreamMessageChunkWriter(addChunks func([][]byte) error) ChunkWriter

NewStreamMessageChunkWriter implements ChunkWriter and output a chunk for reach call to Write

type Envelope

type Envelope interface {
	ID() api.UUID

	// Returns the next message if any. It is a blocking call if 'wait' is true,
	// and all calls after the last message returns a 'EOF' error
	NextMessage(wait bool) (Message, error)

	// Clone the envelope and all the message in it.
	// The clone can be read without altering the original envelope read cursors.
	// Clone must be called before any call to "NextMessage"
	// Warning: using clone on big streamed envelopes can use a LOT of memory
	Clone() (Envelope, error)

	// Close the reading of the envelope and set a custom error.
	// The error cannot be ErrClosedEnvelope. Any subsequent
	// read or write in the envelope will fail with the error that was passed.
	CloseWithError(error) error
}

Envelope holds messages. The messages can be written and read on the fly, which makes the envelope more like a buffer holder than a message holder. It also means the envelope can be read only once, unless cloned first

func MustNewEnvelope

func MustNewEnvelope(messages ...Message) Envelope

MustNewEnvelope creates an envelope and panics if the UUID creation fails, which should be virtually never

func NewEnvelope

func NewEnvelope(messages ...Message) (Envelope, error)

NewEnvelope creates an envelope. The only possible error is when it fails to generate a UUID (which is unlikely), so you may consider MustNewEnvelope.

func NewEnvelopeWithID

func NewEnvelopeWithID(id api.UUID, messages ...Message) Envelope

NewEnvelopeWithID creates an envelope with the given ID

type Message

type Message interface {
	ID() api.UUID
	MsgType() string
	Checksum() api.Checksum
	// Clone the message. Cannot be called if any content access method was
	// already called
	Clone() Message

	Reader() io.Reader
	ChunkReader() ChunkReader
}

Message is typed and holds some data

func MustNewBytesMessage

func MustNewBytesMessage(msgType string, content []byte) Message

MustNewBytesMessage creates a message and panics if a message UUID cannot be generated, which should be virtually never

func MustNewTextMessage

func MustNewTextMessage(msgType string, content string) Message

MustNewTextMessage creates a message and panics if a message UUID cannot be generated, which should be virtually never

func NewBytesMessage

func NewBytesMessage(msgType string, content []byte) (Message, error)

NewBytesMessage creates a message with a simple []byte content The content, if too big, may be splitted in smaller chunks

func NewBytesMessageWithID

func NewBytesMessageWithID(id api.UUID, msgType string, content []byte) (Message, error)

NewBytesMessageWithID creates a message with a simple []byte content The content, if too big, may be splitted in smaller chunks

func NewMessageWriter

func NewMessageWriter(msgType string) (Message, io.WriteCloser, error)

NewMessageWriter returns a Message and a writer to write its content The Close method must be called to end the stream

func NewMessageWriterWithID

func NewMessageWriterWithID(id api.UUID, msgType string) (Message, io.WriteCloser)

NewMessageWriterWithID returns a Message and a writer to write its content The Close method must be called to end the stream

func NewTextMessage

func NewTextMessage(msgType string, content string) (Message, error)

NewTextMessage creates a message with a simple string content The content, if too big, may be splitted in smaller chunks

func NewTextMessageWithID

func NewTextMessageWithID(id api.UUID, msgType string, content string) (Message, error)

NewTextMessageWithID creates a message with a simple string content The content, if too big, may be splitted in smaller chunks

type MessageCloser

type MessageCloser interface {
	CloseWithError(err error) error
	Message
}

MessageCloser is a Message that should be closed.

type SimpleMessage

type SimpleMessage struct {
	// contains filtered or unexported fields
}

SimpleMessage is a non-streamed implementation of Message

func (*SimpleMessage) Checksum

func (m *SimpleMessage) Checksum() api.Checksum

Checksum returns the message checksum

func (*SimpleMessage) ChunkReader

func (m *SimpleMessage) ChunkReader() ChunkReader

ChunkReader returns a ChunkReader on the content

func (*SimpleMessage) Clone

func (m *SimpleMessage) Clone() Message

Clone the message

func (*SimpleMessage) ID

func (m *SimpleMessage) ID() api.UUID

ID returns the message ID

func (*SimpleMessage) MsgType

func (m *SimpleMessage) MsgType() string

MsgType returns the message type

func (*SimpleMessage) Reader

func (m *SimpleMessage) Reader() io.Reader

Reader returns a io.Reader on the content

type StreamMessage

type StreamMessage struct {
	// contains filtered or unexported fields
}

StreamMessage is a streamed message

func NewStreamMessage

func NewStreamMessage(id api.UUID, msgType string) *StreamMessage

NewStreamMessage creates a StreamMessage

func (*StreamMessage) Checksum

func (m *StreamMessage) Checksum() (c api.Checksum)

Checksum returns the message checksum if known, 0 otherwise

func (*StreamMessage) ChunkReader

func (m *StreamMessage) ChunkReader() ChunkReader

ChunkReader returns a ChunkReader for the message

func (*StreamMessage) Clone

func (m *StreamMessage) Clone() Message

Clone the message. Cannot be called if any content access method was already called

func (*StreamMessage) CloseWithError

func (m *StreamMessage) CloseWithError(err error) error

CloseWithError set a the given error on the message, which will block any subsequent read/write

func (*StreamMessage) ID

func (m *StreamMessage) ID() api.UUID

ID returns the message ID

func (*StreamMessage) MsgType

func (m *StreamMessage) MsgType() string

MsgType returns the message type

func (*StreamMessage) Reader

func (m *StreamMessage) Reader() io.Reader

Reader returns a io.Reader for the message

func (*StreamMessage) SetChecksum

func (m *StreamMessage) SetChecksum(c api.Checksum)

SetChecksum sets the checksum. If called, the calculated checksum will be compared to it on closing

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL