socket

package
v0.5.1-0...-5006072 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2017 License: Apache-2.0, Apache-2.0 Imports: 17 Imported by: 0

README

Socket

A concise, powerful and high-performance TCP connection socket.

Feature

  • The server and client are peer-to-peer interfaces
  • Support set the size of socket I/O buffer
  • Support custom communication protocol
  • Support custom transfer filter pipe (Such as gzip, encrypt, verify...)
  • Packet contains both Header and Body
  • Supports custom encoding types, e.g JSON Protobuf
  • Header contains the status code and its description text
  • Each socket is assigned an id
  • Provides Socket Hub, Socket pool and *Packet stack
  • Support setting the size of the reading packet (if exceed disconnect it)

Benchmark

  • Test server configuration
darwin amd64 4CPU 8GB
  • teleport-socket

tp_socket_benchmark

test code

  • contrast rpcx

rpcx_benchmark

test code

  • torch of teleport-socket

tp_socket_torch

svg file

Keyworks

  • Packet: The corresponding structure of the data package
  • Proto: The protocol interface of packet pack/unpack
  • Codec: Serialization interface for Packet.Body
  • XferPipe: A series of pipelines to handle packet data before transfer
  • XferFilter: A interface to handle packet data before transfer

Packet

The contents of every one packet:

// in socket package
type (
	// Packet a socket data packet.
	Packet struct {
		// packet sequence
		seq uint64
		// packet type, such as PULL, PUSH, REPLY
		ptype byte
		// URL string
		uri string
		// metadata
		meta *utils.Args
		// body codec type
		bodyCodec byte
		// body object
		body interface{}
		// newBodyFunc creates a new body by packet type and URI.
		// Note:
		//  only for writing packet;
		//  should be nil when reading packet.
		newBodyFunc NewBodyFunc
		// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
		// Note: the length can not be bigger than 255!
		xferPipe *xfer.XferPipe
		// packet size
		size uint32
		next *Packet
	}

	// NewBodyFunc creates a new body by header info.
	NewBodyFunc func(seq uint64, ptype byte, uri string) interface{}
)

// in xfer package
type (
	// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
	// Note: the length can not be bigger than 255!
	XferPipe struct {
		filters []XferFilter
	}
	// XferFilter handles byte stream of packet when transfer.
	XferFilter interface {
		Id() byte
		OnPack([]byte) ([]byte, error)
		OnUnpack([]byte) ([]byte, error)
	}
)

Protocol

You can customize your own communication protocol by implementing the interface:

type (
	// Proto pack/unpack protocol scheme of socket packet.
	Proto interface {
		// Version returns the protocol's id and name.
		Version() (byte, string)
		// Pack pack socket data packet.
		// Note: Make sure to write only once or there will be package contamination!
		Pack(*Packet) error
		// Unpack unpack socket data packet.
		// Note: Concurrent unsafe!
		Unpack(*Packet) error
	}
	ProtoFunc func(io.ReadWriter) Proto
)

Next, you can specify the communication protocol in the following ways:

func SetDefaultProtoFunc(ProtoFunc)
func GetSocket(net.Conn, ...ProtoFunc) Socket
func NewSocket(net.Conn, ...ProtoFunc) Socket

Demo

server.go
package main

import (
	"log"
	"net"

	"github.com/henrylee2cn/teleport/socket"
	"github.com/henrylee2cn/teleport/socket/example/pb"
)

func main() {
	socket.SetPacketSizeLimit(512)
	lis, err := net.Listen("tcp", "0.0.0.0:8000")
	if err != nil {
		log.Fatalf("[SVR] listen err: %v", err)
	}
	log.Printf("listen tcp 0.0.0.0:8000")
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Fatalf("[SVR] accept err: %v", err)
		}
		go func(s socket.Socket) {
			log.Printf("accept %s", s.Id())
			defer s.Close()
			var pbTest = new(pb.PbTest)
			for {
				// read request
				var packet = socket.GetPacket(socket.WithNewBody(
					func(seq uint64, ptype byte, uri string) interface{} {
						*pbTest = pb.PbTest{}
						return pbTest
					}),
				)
				err = s.ReadPacket(packet)
				if err != nil {
					log.Printf("[SVR] read request err: %v", err)
					return
				} else {
					// log.Printf("[SVR] read request: %v", packet)
				}

				// write response
				pbTest.A = pbTest.A + pbTest.B
				pbTest.B = pbTest.A - pbTest.B*2
				packet.SetBody(pbTest)

				err = s.WritePacket(packet)
				if err != nil {
					log.Printf("[SVR] write response err: %v", err)
				} else {
					// log.Printf("[SVR] write response: %v", packet)
				}
				socket.PutPacket(packet)
			}
		}(socket.GetSocket(conn))
	}
}
client.go
package main

import (
	"log"
	"net"

	"github.com/henrylee2cn/teleport/codec"
	"github.com/henrylee2cn/teleport/socket"

	"github.com/henrylee2cn/teleport/socket/example/pb"
)

func main() {
	conn, err := net.Dial("tcp", "127.0.0.1:8000")
	if err != nil {
		log.Fatalf("[CLI] dial err: %v", err)
	}
	s := socket.GetSocket(conn)
	defer s.Close()
	var packet = socket.GetPacket()
	defer socket.PutPacket(packet)
	for i := uint64(0); i < 1; i++ {
		// write request
		packet.Reset()
		packet.SetPtype(0)
		packet.SetBodyCodec(codec.ID_JSON)
		packet.SetSeq(i)
		packet.SetUri("/a/b")
		packet.SetBody(&pb.PbTest{A: 10, B: 2})
		err = s.WritePacket(packet)
		if err != nil {
			log.Printf("[CLI] write request err: %v", err)
			continue
		}
		log.Printf("[CLI] write request: %v", packet)

		// read response
		packet.Reset(socket.WithNewBody(
			func(seq uint64, ptype byte, uri string) interface{} {
				return new(pb.PbTest)
			}),
		)
		err = s.ReadPacket(packet)
		if err != nil {
			log.Printf("[CLI] read response err: %v", err)
		} else {
			log.Printf("[CLI] read response: %v", packet)
		}
	}
}

Documentation

Overview

Socket package provides a concise, powerful and high-performance TCP socket.

Copyright 2017 HenryLee. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var (

	// ErrExceedPacketSizeLimit error
	ErrExceedPacketSizeLimit = errors.New("Size of package exceeds limit.")
)
View Source
var ErrProactivelyCloseSocket = errors.New("socket is closed proactively")

ErrProactivelyCloseSocket proactively close the socket error.

View Source
var (
	ErrProtoUnmatch = errors.New("Mismatched protocol")
)

error

Functions

func GetDefaultBodyCodec

func GetDefaultBodyCodec() codec.Codec

GetDefaultBodyCodec gets the body default codec.

func PacketSizeLimit

func PacketSizeLimit() uint32

PacketSizeLimit gets the packet size upper limit of reading.

func PutPacket

func PutPacket(p *Packet)

PutPacket puts a *Packet to packet stack.

func SetDefaultBodyCodec

func SetDefaultBodyCodec(codecId byte)

SetDefaultBodyCodec set the default header codec. Note:

If the codec.Codec named 'codecId' is not registered, it will panic;
It is not safe to call it concurrently.

func SetDefaultProtoFunc

func SetDefaultProtoFunc(protoFunc ProtoFunc)

SetDefaultProtoFunc sets the default builder of socket communication protocol

func SetPacketSizeLimit

func SetPacketSizeLimit(maxPacketSize uint32)

SetPacketSizeLimit sets max packet size. If maxSize<=0, set it to max uint32.

func SetTCPReadBuffer

func SetTCPReadBuffer(bytes int)

SetReadBuffer sets the size of the operating system's receive buffer associated with the *net.TCP connection. Note: Uses the default value, if bytes=1.

func SetTCPWriteBuffer

func SetTCPWriteBuffer(bytes int)

SetWriteBuffer sets the size of the operating system's transmit buffer associated with the *net.TCP connection. Note: Uses the default value, if bytes=1.

Types

type Body

type Body interface {
	// BodyCodec returns the body codec type id
	BodyCodec() byte
	// SetBodyCodec sets the body codec type id
	SetBodyCodec(bodyCodec byte)
	// Body returns the body object
	Body() interface{}
	// SetBody sets the body object
	SetBody(body interface{})
	// SetNewBody resets the function of geting body.
	SetNewBody(newBodyFunc NewBodyFunc)

	// MarshalBody returns the encoding of body.
	MarshalBody() ([]byte, error)
	// UnmarshalNewBody unmarshal the encoded data to a new body.
	// Note: seq, ptype, uri must be setted already.
	UnmarshalNewBody(bodyBytes []byte) error
	// UnmarshalBody unmarshal the encoded data to the existed body.
	UnmarshalBody(bodyBytes []byte)
}

packet body interface

type FastProto

type FastProto struct {
	// contains filtered or unexported fields
}

FastProto fast socket communication protocol.

func (*FastProto) Pack

func (f *FastProto) Pack(p *Packet) error

Pack pack socket data packet. Note: Make sure to write only once or there will be package contamination!

func (*FastProto) Unpack

func (f *FastProto) Unpack(p *Packet) error

Unpack unpack socket data packet. Note: Concurrent unsafe!

func (*FastProto) Version

func (f *FastProto) Version() (byte, string)

Version returns the protocol's id and name.

type Header interface {
	// Ptype returns the packet sequence
	Seq() uint64
	// SetSeq sets the packet sequence
	SetSeq(uint64)
	// Ptype returns the packet type, such as PULL, PUSH, REPLY
	Ptype() byte
	// Ptype sets the packet type
	SetPtype(byte)
	// Uri returns the URL string string
	Uri() string
	// SetUri sets the packet URL string
	SetUri(string)
	// Meta returns the metadata
	Meta() *utils.Args
	// SetMeta sets the metadata
	SetMeta(*utils.Args)
}

packet header interface

type NewBodyFunc

type NewBodyFunc func(Header) interface{}

NewBodyFunc creates a new body by header.

type Packet

type Packet struct {
	// contains filtered or unexported fields
}

Packet a socket data packet.

func GetPacket

func GetPacket(settings ...PacketSetting) *Packet

GetPacket gets a *Packet form packet stack. Note:

newBodyFunc is only for reading form connection;
settings are only for writing to connection.

func NewPacket

func NewPacket(settings ...PacketSetting) *Packet

NewPacket creates a new *Packet. Note:

NewBody is only for reading form connection;
settings are only for writing to connection.

func (*Packet) AppendXferPipeFrom

func (p *Packet) AppendXferPipeFrom(src *Packet)

AppendXferPipeFrom appends transfer filters from a *Packet.

func (*Packet) Body

func (p *Packet) Body() interface{}

Body returns the body object

func (*Packet) BodyCodec

func (p *Packet) BodyCodec() byte

BodyCodec returns the body codec type id

func (*Packet) MarshalBody

func (p *Packet) MarshalBody() ([]byte, error)

MarshalBody returns the encoding of body.

func (*Packet) Meta

func (p *Packet) Meta() *utils.Args

Meta returns the metadata

func (*Packet) Ptype

func (p *Packet) Ptype() byte

Ptype returns the packet type, such as PULL, PUSH, REPLY

func (*Packet) Reset

func (p *Packet) Reset(settings ...PacketSetting)

Reset resets itself. Note:

newBodyFunc is only for reading form connection;
settings are only for writing to connection.

func (*Packet) Seq

func (p *Packet) Seq() uint64

Ptype returns the packet sequence

func (*Packet) SetBody

func (p *Packet) SetBody(body interface{})

SetBody sets the body object

func (*Packet) SetBodyCodec

func (p *Packet) SetBodyCodec(bodyCodec byte)

SetBodyCodec sets the body codec type id

func (*Packet) SetMeta

func (p *Packet) SetMeta(meta *utils.Args)

SetMeta sets the metadata

func (*Packet) SetNewBody

func (p *Packet) SetNewBody(newBodyFunc NewBodyFunc)

SetNewBody resets the function of geting body.

func (*Packet) SetPtype

func (p *Packet) SetPtype(ptype byte)

Ptype sets the packet type

func (*Packet) SetSeq

func (p *Packet) SetSeq(seq uint64)

SetSeq sets the packet sequence

func (*Packet) SetSize

func (p *Packet) SetSize(size uint32) error

SetSizeAndCheck sets the size of packet. If the size is too big, returns error.

func (*Packet) SetUri

func (p *Packet) SetUri(uri string)

SetUri sets the packet URL string

func (*Packet) Size

func (p *Packet) Size() uint32

Size returns the size of packet.

func (*Packet) String

func (p *Packet) String() string

String returns printing text.

func (*Packet) UnmarshalBody

func (p *Packet) UnmarshalBody(bodyBytes []byte) error

UnmarshalBody unmarshal the encoded data to the existed body.

func (*Packet) UnmarshalNewBody

func (p *Packet) UnmarshalNewBody(bodyBytes []byte) error

UnmarshalNewBody unmarshal the encoded data to a new body. Note: seq, ptype, uri must be setted already.

func (*Packet) Uri

func (p *Packet) Uri() string

Uri returns the URL string string

func (*Packet) XferPipe

func (p *Packet) XferPipe() *xfer.XferPipe

XferPipe returns transfer filter pipe, handlers from outer-most to inner-most. Note: the length can not be bigger than 255!

type PacketSetting

type PacketSetting func(*Packet)

PacketSetting sets Header field.

func WithBody

func WithBody(body interface{}) PacketSetting

WithBody sets the body object

func WithBodyCodec

func WithBodyCodec(bodyCodec byte) PacketSetting

func WithMeta

func WithMeta(meta *utils.Args) PacketSetting

WithMeta sets the metadata

func WithNewBody

func WithNewBody(newBodyFunc NewBodyFunc) PacketSetting

WithNewBody resets the function of geting body.

func WithPtype

func WithPtype(ptype byte) PacketSetting

Ptype sets the packet type

func WithSeq

func WithSeq(seq uint64) PacketSetting

WithSeq sets the packet sequence

func WithUri

func WithUri(uri string) PacketSetting

WithUri sets the packet URL string

func WithXferPipe

func WithXferPipe(filterId ...byte) PacketSetting

WithXferPipe sets transfer filter pipe.

type Proto

type Proto interface {
	// Version returns the protocol's id and name.
	Version() (byte, string)
	// Pack pack socket data packet.
	// Note: Make sure to write only once or there will be package contamination!
	Pack(*Packet) error
	// Unpack unpack socket data packet.
	// Note: Concurrent unsafe!
	Unpack(*Packet) error
}

Proto pack/unpack protocol scheme of socket packet.

type ProtoFunc

type ProtoFunc func(io.ReadWriter) Proto

func DefaultProtoFunc

func DefaultProtoFunc() ProtoFunc

DefaultProtoFunc gets the default builder of socket communication protocol

type Socket

type Socket interface {
	// LocalAddr returns the local network address.
	LocalAddr() net.Addr

	// RemoteAddr returns the remote network address.
	RemoteAddr() net.Addr

	// SetDeadline sets the read and write deadlines associated
	// with the connection. It is equivalent to calling both
	// SetReadDeadline and SetWriteDeadline.
	//
	// A deadline is an absolute time after which I/O operations
	// fail with a timeout (see type Error) instead of
	// blocking. The deadline applies to all future and pending
	// I/O, not just the immediately following call to Read or
	// Write. After a deadline has been exceeded, the connection
	// can be refreshed by setting a deadline in the future.
	//
	// An idle timeout can be implemented by repeatedly extending
	// the deadline after successful Read or Write calls.
	//
	// A zero value for t means I/O operations will not time out.
	SetDeadline(t time.Time) error

	// SetReadDeadline sets the deadline for future Read calls
	// and any currently-blocked Read call.
	// A zero value for t means Read will not time out.
	SetReadDeadline(t time.Time) error

	// SetWriteDeadline sets the deadline for future Write calls
	// and any currently-blocked Write call.
	// Even if write times out, it may return n > 0, indicating that
	// some of the data was successfully written.
	// A zero value for t means Write will not time out.
	SetWriteDeadline(t time.Time) error

	// WritePacket writes header and body to the connection.
	// Note: must be safe for concurrent use by multiple goroutines.
	WritePacket(packet *Packet) error

	// ReadPacket reads header and body from the connection.
	// Note: must be safe for concurrent use by multiple goroutines.
	ReadPacket(packet *Packet) error

	// Read reads data from the connection.
	// Read can be made to time out and return an Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetReadDeadline.
	Read(b []byte) (n int, err error)

	// Write writes data to the connection.
	// Write can be made to time out and return an Error with Timeout() == true
	// after a fixed time limit; see SetDeadline and SetWriteDeadline.
	Write(b []byte) (n int, err error)

	// Close closes the connection socket.
	// Any blocked Read or Write operations will be unblocked and return errors.
	Close() error

	// Public returns temporary public data of Socket.
	Public() goutil.Map
	// PublicLen returns the length of public data of Socket.
	PublicLen() int
	// Id returns the socket id.
	Id() string
	// SetId sets the socket id.
	SetId(string)
}

Socket is a generic stream-oriented network connection.

Multiple goroutines may invoke methods on a Socket simultaneously.

func GetSocket

func GetSocket(c net.Conn, protoFunc ...ProtoFunc) Socket

GetSocket gets a Socket from pool, and reset it.

func NewSocket

func NewSocket(c net.Conn, protoFunc ...ProtoFunc) Socket

NewSocket wraps a net.Conn as a Socket.

type SocketHub

type SocketHub struct {
	// contains filtered or unexported fields
}

SocketHub sockets hub

func NewSocketHub

func NewSocketHub() *SocketHub

NewSocketHub creates a new sockets hub.

func (*SocketHub) ChangeId

func (sh *SocketHub) ChangeId(newId string, socket Socket)

ChangeId changes the socket id. Note: if the old id is remoteAddr, won't delete the index from socketHub.

func (*SocketHub) Delete

func (sh *SocketHub) Delete(id string)

Delete deletes the Socket for a id.

func (*SocketHub) Get

func (sh *SocketHub) Get(id string) (Socket, bool)

Get gets Socket by id. If second returned arg is false, mean the Socket is not found.

func (*SocketHub) Len

func (sh *SocketHub) Len() int

Len returns the length of the socket hub. Note: the count implemented using sync.Map may be inaccurate.

func (*SocketHub) Random

func (sh *SocketHub) Random() (Socket, bool)

Random gets a Socket randomly. If third returned arg is false, mean no Socket is exist.

func (*SocketHub) Range

func (sh *SocketHub) Range(f func(Socket) bool)

Range calls f sequentially for each id and Socket present in the socket hub. If f returns false, range stops the iteration.

func (*SocketHub) Set

func (sh *SocketHub) Set(socket Socket)

Set sets a Socket.

Directories

Path Synopsis
pb
Package pb is a generated protocol buffer package.
Package pb is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL