streammux

package module
v0.0.0-...-fffd9f0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2022 License: Apache-2.0 Imports: 8 Imported by: 0

README

About

Simple binary muxer over arbitrary pair of streams.

Created mostly for simple IPC for go processes.

Documentation

Overview

Package streammux implements a very simple muxer over a pair of streams. It's meant to be used for very specific situations where user can guarantee a stable streams. It's main purpose for writing was creation of a low cost muxer for inter-process communication over stdio.

Index

Examples

Constants

View Source
const HeaderSize = 16

HeaderSize represents a size of a header sent through pipe

Variables

View Source
var (
	// ErrInvalidMessage is returned when an unknown message type was detected
	ErrInvalidMessage = errors.New("invalid message")
	// ErrInvalidResponse indicates that the client responded with malformed or unknown message
	ErrInvalidResponse error = errors.New("invalid response")
	// ErrMessageReadTimeout is returned when read timed out
	ErrMessageReadTimeout = errors.New("incoming message read tiemd out")
	// ErrMessageWriteTimeout is returned when write timed out
	ErrMessageWriteTimeout = errors.New("incoming message write tiemd out")
)
View Source
var ByteOrder = binary.LittleEndian

ByteOrder represents a byte order used in communication

Functions

This section is empty.

Types

type Demuxer

type Demuxer struct {
	// BodyChunkSize represents how much per socket is allocated for body buffering
	BodyChunkSize int
	// BodyBufferSize represents maximum space that can be retained by body buffers, defaults to 256KB
	BodyBufferSize int
	// Reader from which demuxer reads incoming messages. Defaults to os.Stdin
	Reader io.Reader

	// Writer to which respones are written. Defaults to os.Stdout
	Writer io.Writer

	// Handler is a user message handler
	Handler Handler
	// contains filtered or unexported fields
}

Demuxer handles incoming messages dispatching them to their handler.

It is a responsibility of Muxer to make sure that there are no concurrent messages with the same id until all read and write operations are done for id.

func (*Demuxer) Listen

func (d *Demuxer) Listen() error

Listen starts listening for messages

type ErrorMessage

type ErrorMessage struct {
	// contains filtered or unexported fields
}

ErrorMessage represents an error sent by the other end

type Handler

type Handler interface {
	Handle(io.Reader) (io.Reader, error)
}

Handler is an interface which must be implemented by user message handlers

type Muxer

type Muxer struct {
	// BodyChunkSize represents how much per socket is allocated for body buffering
	BodyChunkSize int
	// MaxConcurrent represents a maximum in progress messages. Defaults to 256.
	MaxConcurrent uint16
	// Reader that is a source of data for multiplexer.
	Reader io.Reader

	// Writer that is a target for data for multiplexer.
	Writer io.Writer
	// contains filtered or unexported fields
}

Muxer is a simple multiplexer that allows raw binary transport over a pair of reader and writer. It implements internal bookkeeping to track response to user request

Example (BytePipe)
package main

import (
	"fmt"

	"github.com/graphql-editor/streammux"
)

func main() {
	var m streammux.Muxer
	resp, err := m.DoByte([]byte("message"))
	fmt.Println(string(resp))
	fmt.Println(err)
}

func (*Muxer) Do

func (m *Muxer) Do(dst io.Writer, src io.Reader) error

Do performs a request over pipe and returns a response by reading data from reader. This operation requires an additional buffer to be allocated so it is slower compared to DoByte and DoSize. If size of data to be read is known ahead of time, use DoSize.

func (*Muxer) DoByte

func (m *Muxer) DoByte(req []byte) ([]byte, error)

DoByte performs a request over pipe and returns a response

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL