stream

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2014 License: GPL-3.0-or-later Imports: 5 Imported by: 12

Documentation

Overview

Package stream wraps a TCP/IP network connection with the Go gob en/decoder.

Note, in case of a serialization error (encoding or decoding failure), it is assumed that there is either a protocol mismatch between the parties, or an implementation bug; but in any case, the connection is deemed failed and is terminated.

Example (Usage)

Stream example of an echo server and client using streams.

// Iris - Decentralized cloud messaging
// Copyright (c) 2013 Project Iris. All rights reserved.
//
// Iris is dual licensed: you can redistribute it and/or modify it under the
// terms of the GNU General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later
// version.
//
// The framework is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
// more details.
//
// Alternatively, the Iris framework may be used in accordance with the terms
// and conditions contained in a signed written agreement between you and the
// author(s).

package main

import (
	"fmt"
	"net"
	"time"

	"github.com/project-iris/iris/proto/stream"
)

var host = "localhost"
var port = 55555

// Stream example of an echo server and client using streams.
func main() {
	live := make(chan struct{})
	quit := make(chan struct{})
	data := make(chan string)
	msg := "Hello Stream!"

	go server(live, quit)
	<-live
	go client(msg, data)

	fmt.Println("Input message:", msg)
	fmt.Println("Output message:", <-data)

	close(quit)
}

func server(live, quit chan struct{}) {
	// Open a TCP port to accept incoming stream connections
	addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port))
	if err != nil {
		fmt.Println("Failed to resolve local address:", err)
		return
	}
	sock, err := stream.Listen(addr)
	if err != nil {
		fmt.Println("Failed to listen for incoming streams:", err)
		return
	}
	sock.Accept(time.Second)
	live <- struct{}{}

	// While not exiting, process stream connections
	for {
		select {
		case <-quit:
			if err = sock.Close(); err != nil {
				fmt.Println("Failed to terminate stream listener:", err)
			}
			return
		case strm := <-sock.Sink:
			defer strm.Close()

			// Receive and echo back a string
			var data string
			if err = strm.Recv(&data); err != nil {
				fmt.Println("Failed to receive a string object:", err)
				continue
			}
			if err = strm.Send(&data); err != nil {
				fmt.Println("Failed to send back a string object:", err)
				continue
			}
			if err = strm.Flush(); err != nil {
				fmt.Println("Failed to flush the response:", err)
				return
			}
		}
	}
}

func client(msg string, ch chan string) {
	// Open a TCP connection to the stream server
	addr := fmt.Sprintf("%s:%d", host, port)
	strm, err := stream.Dial(addr, time.Second)
	if err != nil {
		fmt.Println("Failed to connect to stream server:", err)
		return
	}
	defer strm.Close()

	// Send the message and receive a reply
	if err = strm.Send(msg); err != nil {
		fmt.Println("Failed to send the message:", err)
		return
	}
	if err = strm.Flush(); err != nil {
		fmt.Println("Failed to flush the message:", err)
		return
	}
	if err = strm.Recv(&msg); err != nil {
		fmt.Println("Failed to receive the reply:", err)
		return
	}
	// Return the reply to the caller and terminate
	ch <- msg
}
Output:

Input message: Hello Stream!
Output message: Hello Stream!

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Listener

type Listener struct {
	Sink chan *Stream // Channel receiving the accepted connections
	// contains filtered or unexported fields
}

Stream listener to accept inbound connections.

func Listen

func Listen(addr *net.TCPAddr) (*Listener, error)

Opens a TCP server socket and returns a stream listener, ready to accept. If an auto-port (0) is requested, the port is updated in the argument.

func (*Listener) Accept

func (l *Listener) Accept(timeout time.Duration)

Starts the stream connection accepter, with a maximum timeout to wait for an established connection to be handled.

func (*Listener) Close

func (l *Listener) Close() error

Terminates the acceptor and returns any encountered errors.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

TCP/IP based stream with a gob encoder on top.

func Dial

func Dial(address string, timeout time.Duration) (*Stream, error)

Connects to a remote host and returns the connection stream.

func (*Stream) Close

func (s *Stream) Close() error

Closes the underlying network connection of a stream.

func (*Stream) Flush

func (s *Stream) Flush() error

Flushes the outbound socket. In case of an error, the network stream is torn down.

func (*Stream) Recv

func (s *Stream) Recv(data interface{}) error

Receives a gob of the given type and returns it. If an error occurs, the network stream is torn down.

func (*Stream) Send

func (s *Stream) Send(data interface{}) error

Serializes an object and sends it over the wire. In case of an error, the connection is torn down.

func (*Stream) Sock

func (s *Stream) Sock() *net.TCPConn

Retrieves the raw connection object if special manipulations are needed.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL