multiplex

package module
v0.0.0-...-e565e6f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2014 License: BSD-3-Clause Imports: 6 Imported by: 0

README

Multiplexed streams for Go Build Status

Package multiplex provides multiplexed streams over a single underlying transport io.ReadWriteCloser.

Any system that requires a large number of independent TCP connections could benefit from this package, by instead having each client maintain a single multiplexed connection. There is essentially very little cost to creating new channels, or maintaining a large number of open channels. Ideal for long term waiting.

An interesting side-effect of this multiplexing is that once the underlying connection has been established, each end of the connection can both Accept() and Dial(). This allows for elegant push notifications and other interesting approaches.

Documentation

Can be found on godoc.org or below.

Example Server
ln, err := net.Listen("tcp", ":1234")
for {
    conn, err := ln.Accept()
    go func(conn net.Conn) {
        mx := multiplex.MultiplexedServer(conn)
        for {
            c, err := mx.Accept()
            go handleConnection(c)
        }
    }()
}
Example Client

Connect to a server with a single TCP connection, then create 10K channels over it and write "hello" to each.

conn, err := net.Dial("tcp", "127.0.0.1:1234")
mx := multiplex.MultiplexedClient(conn)

for i := 0; i < 10000; i++ {
    go func() {
        c, err := mx.Dial()
        n, err := c.Write([]byte("hello"))
        c.Close()
    }()
}

Usage

const (
	SYN = 1 << iota
	RST = 1 << iota
)

Packet flags.

const (
	// FragmentSize (in bytes) of packet fragments.
	FragmentSize = 1024
)
var (
	// ErrInvalidChannel is returned when an attempt is made to write to an invalid channel.
	ErrInvalidChannel = errors.New("invalid channel")
)
type Channel
type Channel struct {
}

A Channel managed by the multiplexer.

func (*Channel) Close
func (c *Channel) Close() error

Close a multiplexed channel.

func (*Channel) Read
func (c *Channel) Read(b []byte) (int, error)

Read bytes from a multiplexed channel.

func (*Channel) Write
func (c *Channel) Write(b []byte) (int, error)

Write bytes to a multiplexed channel. The underlying implementation will fragment the payload into FragmentSize chunks to prevent starvation of other channels.

type MultiplexedStream
type MultiplexedStream struct {
}
func MultiplexedClient
func MultiplexedClient(conn io.ReadWriteCloser) *MultiplexedStream

MultiplexedClient creates a new multiplexed client-side stream.

func MultiplexedServer
func MultiplexedServer(conn io.ReadWriteCloser) *MultiplexedStream

MultiplexedServer creates a new multiplexed server-side stream.

func (*MultiplexedStream) Accept
func (m *MultiplexedStream) Accept() (*Channel, error)
func (*MultiplexedStream) Close
func (m *MultiplexedStream) Close() error
func (*MultiplexedStream) Dial
func (m *MultiplexedStream) Dial() (*Channel, error)

Dial the remote end, creating a new multiplexed channel.

Documentation

Overview

Package multiplex provides multiplexed streams over a single underlying transport `io.ReadWriteCloser`.

Any system that requires a large number of independent TCP connections could benefit from this package, by instead having each client maintain a single multiplexed connection. There is essentially very little cost to creating new channels, or maintaining a large number of open channels. Ideal for long term waiting.

An interesting side-effect of this multiplexing is that once the underlying connection has been established, each end of the connection can both `Accept()` and `Dial()`. This allows for elegant push notifications and other interesting approaches.

Documentation

Can be found on [godoc.org](http://godoc.org/github.com/alecthomas/multiplex) or below.

Example Server

ln, err := net.Listen("tcp", ":1234")
for {
    conn, err := ln.Accept()
    go func(conn net.Conn) {
        mx := multiplex.MultiplexedServer(conn)
        for {
            c, err := mx.Accept()
            go handleConnection(c)
        }
    }()
}

Example Client

Connect to a server with a single TCP connection, then create 10K channels over it and write "hello" to each.

conn, err := net.Dial("tcp", "127.0.0.1:1234")
mx := multiplex.MultiplexedClient(conn)

for i := 0; i < 10000; i++ {
    go func() {
        c, err := mx.Dial()
        n, err := c.Write([]byte("hello"))
        c.Close()
    }()
}

Index

Examples

Constants

View Source
const (
	SYN = 1 << iota
	RST = 1 << iota
)

Packet flags.

View Source
const (
	// FragmentSize (in bytes) of packet fragments.
	FragmentSize = 1024
)

Variables

View Source
var (
	// ErrInvalidChannel is returned when an attempt is made to write to an invalid channel.
	ErrInvalidChannel = errors.New("invalid channel")
)

Functions

This section is empty.

Types

type Channel

type Channel struct {
	// contains filtered or unexported fields
}

A Channel managed by the multiplexer.

func (*Channel) Close

func (c *Channel) Close() error

Close a multiplexed channel.

func (*Channel) Read

func (c *Channel) Read(b []byte) (int, error)

Read bytes from a multiplexed channel.

func (*Channel) Write

func (c *Channel) Write(b []byte) (int, error)

Write bytes to a multiplexed channel. The underlying implementation will fragment the payload into FragmentSize chunks to prevent starvation of other channels.

type MultiplexedStream

type MultiplexedStream struct {
	// contains filtered or unexported fields
}

func MultiplexedClient

func MultiplexedClient(conn io.ReadWriteCloser) *MultiplexedStream

MultiplexedClient creates a new multiplexed client-side stream.

Example
conn, err := net.Dial("tcp", "127.0.0.1:1234")
if err != nil {
	log.Fatal(err)
}
mx := MultiplexedClient(conn)

// Create 10000 multiplexed connections over the socket and send "hello".
for i := 0; i < 10000; i++ {
	go func() {
		c, err := mx.Dial()
		if err != nil {
			log.Fatal(err)
		}
		defer c.Close()

		_, err = c.Write([]byte("hello"))
		if err != nil {
			log.Fatal(err)
		}
	}()
}
Output:

func MultiplexedServer

func MultiplexedServer(conn io.ReadWriteCloser) *MultiplexedStream

MultiplexedServer creates a new multiplexed server-side stream.

Example
ln, err := net.Listen("tcp", ":1234")
if err != nil {
	log.Fatal(err)
}

// Network accept loop.
for {
	conn, err := ln.Accept()
	if err != nil {
		log.Fatal(err)
	}

	// Multiplexer accept loop.
	go func(conn net.Conn) {
		mx := MultiplexedServer(conn)
		for {
			c, err := mx.Accept()
			if err != nil {
				log.Fatal(err)
			}

			// Channel handler.
			go func(c *Channel) {
				defer c.Close()

				// Read "hello" from client.
				b := make([]byte, 5)
				_, err := c.Read(b)
				if err != nil {
					log.Fatal(err)
				}

				fmt.Printf("Received: %s\n", b)
			}(c)
		}
	}(conn)
}
Output:

func (*MultiplexedStream) Accept

func (m *MultiplexedStream) Accept() (*Channel, error)

func (*MultiplexedStream) Close

func (m *MultiplexedStream) Close() error

func (*MultiplexedStream) Dial

func (m *MultiplexedStream) Dial() (*Channel, error)

Dial the remote end, creating a new multiplexed channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL