wrpssp

package module
v2.2.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

README

wrpssp

WRP Simple Streaming Protocol

Build Status codecov Go Report Card Apache V2 License GitHub Release GoDoc

Summary

A simple streaming protocol built on top of WRP.

Documentation

Overview

Example
sent := "This is an example of how to use the wrpssp package."

packer, _ := wrpssp.New(
	wrpssp.ID("123"),
	wrpssp.Reader(strings.NewReader(sent)),
	// Split the string into 5 byte packets for the example or there would
	// only be one packet.  Normally this would be a much larger number.
	wrpssp.MaxPacketSize(5),

	// Normally this would be EncodingGzip, but for the example we are using
	// EncodingIdentity so that the packets are not compressed.
	wrpssp.WithEncoding(wrpssp.EncodingIdentity),
)

assembler := wrpssp.Assembler{}

ctx := context.Background()
for {
	// This is a pretty safe way to handle the packetizer.  If the packetizer
	// returns a msg, it should be sent.  The error might be interesting,
	// but it doesn't change what should be sent.
	msg, err := packer.Next(ctx, wrp.Message{
		Type:        wrp.SimpleEventMessageType,
		Source:      "self:",
		Destination: "event:foo",
	})
	if err != nil && !errors.Is(err, io.EOF) {
		panic(err)
	}
	if msg == nil {
		break
	}

	// Normally the message would have source, destination, and content type set
	// but for this example we are only interested in the payload, so there
	// is no need to set those fields in the msg.

	// Normally the msg would be sent out over the wire, but for this
	// example we are going to simply directly assemble the packets.
	err = assembler.ProcessWRP(context.Background(), *msg)
	if err != nil {
		break
	}
}

// The assembler will now have the original message.

buf, _ := io.ReadAll(&assembler)

fmt.Println(string(buf))
Output:
This is an example of how to use the wrpssp package.
Example (RequestResponse)
sent := "This is an example of how to use the wrpssp package with a request/response."

// This allows a predictable transaction ID to be used for the example.  Normally
// this would be a random UUID.
var tidCount int

packer, _ := wrpssp.New(
	wrpssp.ID("123"),
	wrpssp.Reader(strings.NewReader(sent)),
	// Split the string into 5 byte packets for the example or there would
	// only be one packet.  Normally this would be a much larger number.
	wrpssp.MaxPacketSize(15),

	// Normally this would be EncodingGzip, but for the example we are using
	// EncodingIdentity so that the packets are not compressed.
	wrpssp.WithEncoding(wrpssp.EncodingIdentity),

	// Normally you'd want to use a normal uuid generator, but for the example
	// we are using a simple counter to generate a predictable transaction ID.
	wrpssp.WithUpdateTransactionUUID(func() (string, error) {
		defer func() {
			tidCount++
		}()
		return fmt.Sprintf("tid-%d", tidCount), nil
	}),
)

assembler := wrpssp.Assembler{}

ctx := context.Background()
for {
	// This is a pretty safe way to handle the packetizer.  If the packetizer
	// returns a msg, it should be sent.  The error might be interesting,
	// but it doesn't change what should be sent.
	msg, err := packer.Next(ctx, wrp.Message{
		Type:        wrp.SimpleRequestResponseMessageType,
		Source:      "self:",
		Destination: "event:foo",
		//TransactionUUID is automatically set by the packetizer.
	})

	if err != nil && !errors.Is(err, io.EOF) {
		panic(err)
	}
	if msg == nil {
		break
	}

	// Normally the message would have source, destination, and content type set
	// but for this example we are only interested in the payload, so there
	// is no need to set those fields in the msg.

	fmt.Printf("Transaction ID: %s\n", msg.TransactionUUID)

	// Normally the msg would be sent out over the wire, but for this
	// example we are going to simply directly assemble the packets.
	err = assembler.ProcessWRP(context.Background(), *msg)
	if err != nil {
		break
	}
}

// The assembler will now have the original message.

buf, _ := io.ReadAll(&assembler)

fmt.Println(string(buf))
Output:
Transaction ID: tid-0
Transaction ID: tid-1
Transaction ID: tid-2
Transaction ID: tid-3
Transaction ID: tid-4
Transaction ID: tid-5
This is an example of how to use the wrpssp package with a request/response.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidInput is returned when the input to a function is invalid.
	ErrInvalidInput = errors.New("invalid input")

	// ErrClosed is returned when an operation is attempted on a closed stream.
	ErrClosed = errors.New("closed")

	// ErrPacketGapExceeded is returned when a packet is received that is too
	// far ahead of the current packet.
	ErrPacketGapExceeded = errors.New("packet gap exceeded")

	// ErrNotAvailable is returned to indicate that the requested information is
	// not available.
	ErrNotAvailable = errors.New("information not available")
)
View Source
var (
	ErrUnsupportedEncoding = errors.New("unsupported encoding")
)

Functions

func GetEstimatedLength added in v2.2.0

func GetEstimatedLength(msg wrp.Message) (uint64, error)

GetEstimatedLength returns the estimated total length of the stream if the message is an SSP message with the stream-estimated-total-length header. There is only minimal validation done since this is a function used to get the estimated length quickly. For a more thorough validation, use the Is() method in this package.

func GetStreamID

func GetStreamID(msg wrp.Message) (string, error)

GetStreamID returns the stream ID of the message if it is an SSP message. There is only minimal validation done since this is a function used to get the stream ID quickly. For a more thorough validation, use the Is() method in this package.

func Is

func Is(msg wrp.Union, validators ...wrp.Processor) bool

Is determines if the given message is a Simple Streaming Protocol message. If wrp.NoStandardValidation() is passed as a validator, then the message is considered a Simple Streaming Protocol message if it has any matching headers.

Types

type Assembler

type Assembler struct {
	Validators []wrp.Processor
	// Maximum allowed gap between current and received packet number (0 = unlimited)
	MaxPacketGap int
	// contains filtered or unexported fields
}

Assembler is a struct that reads from a stream of WRP messages and assembles them into a single stream.

The Assembler implements the io.ReadCloser interface, as well as the wrp.Processor interface.

Thread-safety: The Assembler is safe for concurrent use between one reader and multiple writers. A single goroutine should call Read(), while one or more goroutines may call ProcessWRP() to deliver packets. Multiple concurrent Read() calls are not supported and will result in undefined behavior.

func (*Assembler) Close

func (a *Assembler) Close() error

Close closes the Assembler and implements the io.Closer interface.

func (*Assembler) ProcessWRP

func (a *Assembler) ProcessWRP(_ context.Context, msg wrp.Message) error

ProcessWRP takes a WRP message and processes it. If the message is not an SSP message, it is ignored. If the message is an SSP message, it is processed.

func (*Assembler) Read

func (a *Assembler) Read(p []byte) (int, error)

Read implements an io.Reader method.

type Encoding

type Encoding string
const (
	EncodingIdentity               Encoding = "identity"
	EncodingGzip                   Encoding = "gzip"
	EncodingGzipNoCompression      Encoding = "gzip+none"
	EncodingGzipBestSpeed          Encoding = "gzip+fastest"
	EncodingGzipBestCompression    Encoding = "gzip+best"
	EncodingGzipHuffmanOnly        Encoding = "gzip+huffman"
	EncodingDeflate                Encoding = "deflate"
	EncodingDeflateNoCompression   Encoding = "deflate+none"
	EncodingDeflateBestSpeed       Encoding = "deflate+fastest"
	EncodingDeflateBestCompression Encoding = "deflate+best"
	EncodingDeflateHuffmanOnly     Encoding = "deflate+huffman"
)

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is a functional option for the Stream.

func EstimatedLength

func EstimatedLength(size int64) Option

EstimatedLength sets the estimated length of the stream. This is optional. If the size is less than 1, the default value of 0 is used.

This field is used to help the receiver determine the progress of the stream if it is a fixed length.

func ID

func ID(id string) Option

ID sets the ID of the stream. The ID must be a non-empty string containing only [A-Za-z0-9 !#$&'()*+,./:;=?@[\]~_-]. This is a required field.

func MaxPacketSize

func MaxPacketSize(size int) Option

MaxPacketSize sets the maximum size of a packet. This is optional. If the size is less than 1, the default value of 64KB is used.

func Reader

func Reader(r io.Reader) Option

Reader sets the stream to read from. This is a required field.

func WithEncoding

func WithEncoding(e Encoding) Option

WithEncoding sets the encoding of the stream. This is optional. If the encoding is not set, the default value of EncodingGzip is used.

func WithUpdateTransactionUUID

func WithUpdateTransactionUUID(fn func() (string, error)) Option

WithUpdateTransactionUUID sets the function to generate a new transaction UUID for each packet. This is optional. If not set, the TransactionUUID from the input message is preserved in the output packets.

This is useful for generating a new transaction UUID for each packet in the stream for a request/response protocol. The function should return a new transaction UUID and an error. The error should be nil if the function succeeds.

If an error is returned, the Packetizer will stop processing and return the error, instead of the packet.

type Packetizer

type Packetizer struct {
	// contains filtered or unexported fields
}

Packetizer is a struct that reads from a stream and populates a stream of WRP messages.

func New

func New(opts ...Option) (*Packetizer, error)

New creates a new Packetizer with the given options. Similar to io.Reader and io.Writer, the Packetizer is not safe for concurrent use.

func (*Packetizer) Next

func (p *Packetizer) Next(ctx context.Context, msg wrp.Message, validators ...wrp.Processor) (*wrp.Message, error)

Next reads from the byte stream and populates the provided WRP message struct with a new payload and additional SSP headers. The function will return the populated WRP message or an error. The error io.EOF will be returned when the stream is exhausted. Other errors may be returned if those are encountered during the processing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL