tcpnetwork

package module
v0.0.0-...-a7babd6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2018 License: MIT Imports: 11 Imported by: 3

README

tcpnetwork

A simple golang tcp server/client.

purpose

Provide a simple model to process multi connections.You can use tcpnetwork as a server and as a client, then process all event in a output channel, so you just need one routine (maybe main routine) to process all things.

protobuf support

You can directly use tcpnetwork to send a protobuf message, and set a hook to unserialize your protobuf message.

asynchoronously process event

A simple way to process connection event is to process all event in one routine by reading the event channel tcpnetwork.GetEventQueue().

server := tcpnetwork.NewTCPNetwork(1024, tcpnetwork.NewStreamProtocol4())
err = :server.Listen(kServerAddress)
if nil != err {
	return panic(err)
}

for {
	select {
	case evt, ok := <-server.GetEventQueue():
		{
			if !ok {
				return
			}

			switch evt.EventType {
			case tcpnetwork.KConnEvent_Connected:
				{
					log.Println("Client ", evt.Conn.GetRemoteAddress(), " connected")
				}
			case tcpnetwork.KConnEvent_Close:
				{
					log.Println("Client ", evt.Conn.GetRemoteAddress(), " disconnected")
				}
			case tcpnetwork.KConnEvent_Data:
				{
					evt.Conn.Send(evt.Data, 0)
				}
			}
		}
	case <-stopCh:
		{
			return
		}
	}
}

synchronously process event

When you need to process event in every connection routine, you can set a synchronously process function callback by setting the Connection's SetSyncExecuteFunc.

Returning true in this callback function represents the processed event will not be put in the event queue.

client := tcpnetwork.NewTCPNetwork(1024, tcpnetwork.NewStreamProtocol4())
conn, err := client.Connect(kServerAddress)
conn.SetSyncExecuteFunc(...)

example

  • echo server

     // echo server routine
     func echoServer() (*tcpnetwork.TCPNetwork, error) {
     	var err error
     	server := tcpnetwork.NewTCPNetwork(1024, tcpnetwork.NewStreamProtocol4())
     	err = server.Listen(kServerAddress)
     	if nil != err {
     		return nil, err
     	}
    
     	return server, nil
     }
    
     func routineEchoServer(server *tcpnetwork.TCPNetwork, wg *sync.WaitGroup, stopCh chan struct{}) {
     	defer func() {
     		log.Println("server done")
     		wg.Done()
     	}()
    
     	for {
     		select {
     		case evt, ok := <-server.GetEventQueue():
     			{
     				if !ok {
     					return
     				}
    
     				switch evt.EventType {
     				case tcpnetwork.KConnEvent_Connected:
     					{
     						log.Println("Client ", evt.Conn.GetRemoteAddress(), " connected")
     					}
     				case tcpnetwork.KConnEvent_Close:
     					{
     						log.Println("Client ", evt.Conn.GetRemoteAddress(), " disconnected")
     					}
     				case tcpnetwork.KConnEvent_Data:
     					{
     						evt.Conn.Send(evt.Data, 0)
     					}
     				}
     			}
     		case <-stopCh:
     			{
     				return
     			}
     		}
     	}
     }
    
  • echo client

      // echo client routine
      func echoClient() (*tcpnetwork.TCPNetwork, *tcpnetwork.Connection, error) {
      	var err error
      	client := tcpnetwork.NewTCPNetwork(1024, tcpnetwork.NewStreamProtocol4())
      	conn, err := client.Connect(kServerAddress)
      	if nil != err {
      		return nil, nil, err
      	}
    
      	return client, conn, nil
      }
    
      func routineEchoClient(client *tcpnetwork.TCPNetwork, wg *sync.WaitGroup, stopCh chan struct{}) {
      	defer func() {
      		log.Println("client done")
      		wg.Done()
      	}()
    
      EVENTLOOP:
      	for {
      		select {
      		case evt, ok := <-client.GetEventQueue():
      			{
      				if !ok {
      					return
      				}
      				switch evt.EventType {
      				case tcpnetwork.KConnEvent_Connected:
      					{
      						log.Println("Press any thing")
      						atomic.StoreInt32(&serverConnected, 1)
      					}
      				case tcpnetwork.KConnEvent_Close:
      					{
      						log.Println("Disconnected from server")
      						atomic.StoreInt32(&serverConnected, 0)
      						break EVENTLOOP
      					}
      				case tcpnetwork.KConnEvent_Data:
      					{
      						text := string(evt.Data)
      						log.Println(evt.Conn.GetRemoteAddress(), ":", text)
      					}
      				}
      			}
      		case <-stopCh:
      			{
      				return
      			}
      		}
      	}
      }
    

See more example in cmd directory.

license

MIT

Documentation

Index

Constants

View Source
const (
	KConnEvent_None = iota
	KConnEvent_Connected
	KConnEvent_Disconnected
	KConnEvent_Data
	KConnEvent_Pb
	KConnEvent_Close
	KConnEvent_Total
)

All connection event

View Source
const (
	KConnFlag_CopySendBuffer = 1 << iota // Copy the send buffer
	KConnFlag_NoHeader                   // Do not append stream header
)

Send method flag

View Source
const (
	KLogLevelDebug = iota
	KLogLevelInfo
	KLogLevelWarn
	KLogLevelError
	KLogLevelFatal
)

Variables

View Source
var (
	ErrConnIsClosed    = errors.New("Connection is closed")
	ErrConnSendTimeout = errors.New("Connection send timeout")
)

Functions

func SetLogger

func SetLogger(logger ILogger)

SetLogger Set the custom logger

func SetPbUnserializeHook

func SetPbUnserializeHook(hook FuncPbUnserializeHook)

SetPbUnserializeHook set the global protobuf unserialize function

Types

type ConnEvent

type ConnEvent struct {
	EventType int
	Conn      *Connection
	Data      []byte
	Userdata  interface{}
	PbM       proto.Message
}

ConnEvent represents a event occurs on a connection, such as connected, disconnected or data arrived

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a wrap for net.Conn and process read and write task of the conn When event occurs, it will call the eventQueue to dispatch event

func (*Connection) ApplyReadDeadline

func (c *Connection) ApplyReadDeadline()

ApplyReadDeadline set the read deadline seconds

func (*Connection) Close

func (c *Connection) Close()

Close the connection, routine safe, send task in the queue will be sent before closing the connection

func (*Connection) Free

func (c *Connection) Free()

Free the connection. When don't need conection to send any thing, free it, DO NOT call it on multi routines

func (*Connection) GetConn

func (c *Connection) GetConn() net.Conn

GetConn get the raw net.Conn interface

func (*Connection) GetConnId

func (c *Connection) GetConnId() int

GetConnId get the connection's id

func (*Connection) GetLocalAddress

func (c *Connection) GetLocalAddress() string

GetLocalAddress return the local address of the connection

func (*Connection) GetReadTimeoutSec

func (c *Connection) GetReadTimeoutSec() int

GetReadTimeoutSec get the read deadline for the connection

func (*Connection) GetRemoteAddress

func (c *Connection) GetRemoteAddress() string

GetRemoteAddress return the remote address of the connection

func (*Connection) GetStatus

func (c *Connection) GetStatus() int32

GetStatus get the connection's status

func (*Connection) GetUnpacker

func (c *Connection) GetUnpacker() IUnpacker

GetUnpacker you can get the unpacker you set

func (*Connection) GetUserdata

func (c *Connection) GetUserdata() interface{}

GetUserdata get the userdata you set

func (*Connection) ResetReadDeadline

func (c *Connection) ResetReadDeadline()

ResetReadDeadline reset the read deadline

func (*Connection) Send

func (c *Connection) Send(msg []byte, f int64) error

Send the buffer with KConnFlag flag

func (*Connection) SendPb

func (c *Connection) SendPb(pb proto.Message, f int64) error

SendPb send protocol buffer message

func (*Connection) SetConnId

func (c *Connection) SetConnId(id int)

SetConnId set the connection's id

func (*Connection) SetReadTimeoutSec

func (c *Connection) SetReadTimeoutSec(sec int)

SetReadTimeoutSec set the read deadline for the connection

func (*Connection) SetSyncExecuteFunc

func (c *Connection) SetSyncExecuteFunc(fn FuncSyncExecute) FuncSyncExecute

SetSyncExecuteFunc , you can set a callback that you can synchoronously process the event in every connection's event routine If the callback function return true, the event will not be dispatched

func (*Connection) SetUnpacker

func (c *Connection) SetUnpacker(unpacker IUnpacker)

SetUnpacker you can set a custom binary stream unpacker on the connection

func (*Connection) SetUserdata

func (c *Connection) SetUserdata(ud interface{})

SetUserdata set the userdata you need

type FuncPbUnserializeHook

type FuncPbUnserializeHook func([]byte) proto.Message

FuncPbUnserializeHook is a function to unserialize binary data to protobuf message

type FuncSyncExecute

type FuncSyncExecute func(*ConnEvent) bool

Sync event callback If return true, this event will not be sent to event channel If return false, this event will be sent to event channel again

type IEventHandler

type IEventHandler interface {
	OnConnected(evt *ConnEvent)
	OnDisconnected(evt *ConnEvent)
	OnRecv(evt *ConnEvent)
}

IEventHandler is callback interface to process connection's event

type IEventQueue

type IEventQueue interface {
	Push(*ConnEvent)
	Pop() *ConnEvent
}

IEventQueue queues all connection's events

type ILogger

type ILogger interface {
	Output(level int, calldepth int, f string) error
}

ILogger is an interface use for log message

type IStreamProtocol

type IStreamProtocol interface {
	// Init
	Init()
	// Get the header length of the stream
	GetHeaderLength() uint32
	// Read the header length of the stream
	UnserializeHeader([]byte) uint32
	// Format header
	SerializeHeader([]byte) []byte
}

IStreamProtocol implement the protocol of the binary data stream for unpacking packet

type IUnpacker

type IUnpacker interface {
	Unpack(*Connection, []byte) ([]byte, error)
}

IUnpacker unpack the binary stream to replace the internal unpack process

type StreamProtocol2

type StreamProtocol2 struct {
}

StreamProtocol2 implement a simple binary stream protocol with 2 bytes header as packet length Binary format : | 2 byte (total stream length) | data ... (total stream length - 2) |

stream protocol interface for 2 bytes header

func NewStreamProtocol2

func NewStreamProtocol2() *StreamProtocol2

func (*StreamProtocol2) GetHeaderLength

func (s *StreamProtocol2) GetHeaderLength() uint32

func (*StreamProtocol2) Init

func (s *StreamProtocol2) Init()

func (*StreamProtocol2) SerializeHeader

func (s *StreamProtocol2) SerializeHeader(body []byte) []byte

func (*StreamProtocol2) UnserializeHeader

func (s *StreamProtocol2) UnserializeHeader(buf []byte) uint32

type StreamProtocol4

type StreamProtocol4 struct {
}

StreamProtocol4 implement a simple binary stream protocol with 4 bytes header as packet length Binary format : | 4 byte (total stream length) | data ... (total stream length - 4) |

implement default stream protocol
stream protocol interface for 4 bytes header

func NewStreamProtocol4

func NewStreamProtocol4() *StreamProtocol4

NewStreamProtocol4 creates a StreamProtocol4

func (*StreamProtocol4) GetHeaderLength

func (s *StreamProtocol4) GetHeaderLength() uint32

GetHeaderLength return the header length

func (*StreamProtocol4) Init

func (s *StreamProtocol4) Init()

Init inits the protocol

func (*StreamProtocol4) SerializeHeader

func (s *StreamProtocol4) SerializeHeader(body []byte) []byte

func (*StreamProtocol4) UnserializeHeader

func (s *StreamProtocol4) UnserializeHeader(buf []byte) uint32

type TCPNetwork

type TCPNetwork struct {
	Conf TCPNetworkConf
	// contains filtered or unexported fields
}

TCPNetwork manages all server and client connections

func NewTCPNetwork

func NewTCPNetwork(eventQueueSize int, sp IStreamProtocol) *TCPNetwork

NewTCPNetwork creates a TCPNetwork object

func (*TCPNetwork) Connect

func (t *TCPNetwork) Connect(addr string) (*Connection, error)

Connect the remote server

func (*TCPNetwork) DisconnectAllConnectionsClient

func (t *TCPNetwork) DisconnectAllConnectionsClient()

DisconnectAllConnectionsClient disconnect all connections on client side

func (*TCPNetwork) DisconnectAllConnectionsServer

func (t *TCPNetwork) DisconnectAllConnectionsServer()

DisconnectAllConnectionsServer disconnect all connections on server side

func (*TCPNetwork) GetEventQueue

func (t *TCPNetwork) GetEventQueue() <-chan *ConnEvent

GetEventQueue get the event queue channel

func (*TCPNetwork) GetReadTimeoutSec

func (t *TCPNetwork) GetReadTimeoutSec() int

GetReadTimeoutSec returns the read timeout seconds of current TCPNetwork

func (*TCPNetwork) GetStreamProtocol

func (t *TCPNetwork) GetStreamProtocol() IStreamProtocol

GetStreamProtocol returns the stream protocol of current TCPNetwork

func (*TCPNetwork) Listen

func (t *TCPNetwork) Listen(addr string) error

Listen an address to accept client connection

func (*TCPNetwork) Pop

func (t *TCPNetwork) Pop() *ConnEvent

Pop the event in event queue

func (*TCPNetwork) Push

func (t *TCPNetwork) Push(evt *ConnEvent)

Push implements the IEventQueue interface

func (*TCPNetwork) ServeWithHandler

func (t *TCPNetwork) ServeWithHandler(handler IEventHandler)

ServeWithHandler process all events in the event queue and dispatch to the IEventHandler

func (*TCPNetwork) SetReadTimeoutSec

func (t *TCPNetwork) SetReadTimeoutSec(sec int)

SetReadTimeoutSec sets the read timeout seconds of current TCPNetwork

func (*TCPNetwork) SetStreamProtocol

func (t *TCPNetwork) SetStreamProtocol(sp IStreamProtocol)

SetStreamProtocol sets the stream protocol of current TCPNetwork

func (*TCPNetwork) Shutdown

func (t *TCPNetwork) Shutdown()

Shutdown frees all connection and stop the listener

type TCPNetworkConf

type TCPNetworkConf struct {
	SendBufferSize int
}

TCPNetworkConf config the TCPNetwork

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL