zmlink

package module
v0.0.0-...-19b5527 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

README

一个定义了基本交互报文的tcp C/S端链接sdk,简化链接服务构建,使用方式如下方demo

报文结构

注册帧(对使用者透明)

| ZM | 19 | 0 | 1 | clientID |

长度位 类型位 报文编号 载荷
ZM uint32 int8 uint64 string

数据存储类型采用大端字节序

类型位,载荷位用户自定义,但约定发送类型为1126,Ack类型为对应的-1-126

demo

server端

package zmlink

import (
	"testing"
)

type linstenerTestEventHandler struct{}

func (t linstenerTestEventHandler) OnStream(link *Link, payload Payload, dir StreamDir) {
	logrus.Infoln("OnStream-->", link.GetClientID(), string(payload.Bytes()), dir.String())
	if dir == StreamIncoming {
		link.ASend(payload.Ack([]byte("回复点消息拉卡拉绿绿绿绿绿绿绿")), nil)
	}
}

func (t linstenerTestEventHandler) OnStatus(link *Link, statusType StatusType) {
	logrus.Infoln("OnStatus-->", link.GetClientID(), statusType)
}

func TestLink_ASend(t *testing.T) {
	service := NewLinkListener("tcp", ":4469", &linstenerTestEventHandler{})
	service.Start()
	select {}
	/*for{
		time.Sleep(4 *time.Second)
		payload, _ := NewBytesPayload(1, []byte("hahahahah"))
		link := service.GetLink("1235")
		if link != nil {
			send, err := link.SSend(payload, 0)
			logrus.Infoln(string(send.Bytes()), err)
		}
	}*/
}

client端

package zmlink

import (
	"github.com/sirupsen/logrus"
	"testing"
)

type clientTestEventHandler struct{}

func (t clientTestEventHandler) OnStream(link *Link, payload Payload, dir StreamDir) {
	logrus.Infoln("OnStream-->", link.GetClientID(), string(payload.Bytes()), dir.String())
}

func (t clientTestEventHandler) OnStatus(link *Link, statusType StatusType) {
	logrus.Infoln("OnStatus-->", link.GetClientID(), statusType)
}

func TestClientLink_ASend(t *testing.T) {
	service := NewLinkClient("1235", "tcp", ":4469", &clientTestEventHandler{})
	service.Start()
	for {
		time.Sleep(4 * time.Second)
		payload, _ := NewBytesPayload(1, []byte("22ZM222"))
		link := service.GetLink()
		if link == nil {
			logrus.Errorln("关闭了")
			continue
		}
		send, err := link.SSend(payload, 10*time.Second)
		if err != nil {
			logrus.Errorln(err.Error())
			continue
		}
		fmt.Println(string(send.Bytes()))
		err = link.ASend(payload, func(p Payload) {
			logrus.Infoln("clientDataHandler-->", string(p.Bytes()))
		})
		if err != nil {
			logrus.Errorln(err.Error())
			continue
		}
	}
}

Documentation

Index

Constants

View Source
const (
	WriteInterval    = 200 * time.Millisecond
	KeepAliveTimeout = 60 * time.Minute // 2*time.Minute + 2*time.Second

)

Variables

This section is empty.

Functions

func EncodePayLoad

func EncodePayLoad(payload Payload) ([]byte, error)

Types

type BytesPayload

type BytesPayload struct {
	// contains filtered or unexported fields
}

BytesPayload bytes payload

func (*BytesPayload) Ack

func (b *BytesPayload) Ack(data []byte) Payload

func (*BytesPayload) Bytes

func (b *BytesPayload) Bytes() []byte

func (*BytesPayload) GetPkgIdx

func (b *BytesPayload) GetPkgIdx() uint64

func (*BytesPayload) SetPkgIdx

func (b *BytesPayload) SetPkgIdx(i uint64)

func (*BytesPayload) Size

func (b *BytesPayload) Size() uint32

func (*BytesPayload) Type

func (b *BytesPayload) Type() int8

type Client

type Client struct {
	Type    string // tcp4/udp
	Address string // listen address

	sync.Mutex // conneceted 锁
	// contains filtered or unexported fields
}

func NewLinkClient

func NewLinkClient(clientID, address string, linkEventListener LinkEventListener) *Client
func (c *Client) GetLink() *Link

GetLink 获取连接

func (*Client) OnStatus

func (c *Client) OnStatus(link *Link, statusType StatusType)

func (*Client) OnStream

func (c *Client) OnStream(link *Link, payload Payload, dir StreamDir)

func (*Client) Start

func (c *Client) Start() error
type Link struct {
	// contains filtered or unexported fields
}

func (*Link) ASend

func (l *Link) ASend(payload Payload, handler func(Payload)) (err error)

ASend 异步发送数据

func (*Link) AddDataHandler

func (l *Link) AddDataHandler(key string, handler *LinkDataHandler)

AddDataHandler 设置异步数据回调处理接口.

func (*Link) Close

func (l *Link) Close()

Close 接口方法, 关闭链接

func (*Link) GetClientID

func (l *Link) GetClientID() string

func (*Link) GetStatus

func (l *Link) GetStatus() StatusType

func (*Link) RemoveDataHandler

func (l *Link) RemoveDataHandler(key string)

RemoveDataHandler 设置异步数据回调处理接口.

func (*Link) SSend

func (l *Link) SSend(payload Payload, timeout time.Duration) (Payload, error)

SSend 同步发送. 等待一次应答.

func (*Link) SetClientID

func (l *Link) SetClientID(clientID string)

type LinkDataHandler

type LinkDataHandler struct {
	// contains filtered or unexported fields
}

type LinkEventListener

type LinkEventListener interface {
	// OnStream 数据流 (源, 大小, 方向)
	OnStream(*Link, Payload, StreamDir)
	// OnStatus 状态变更(源,状态,附加信息)
	OnStatus(*Link, StatusType)
}

type Listener

type Listener struct {
	Type    string // tcp4/udp
	Address string // listen address

	sync.Mutex // connected 锁
	// contains filtered or unexported fields
}

func NewLinkListener

func NewLinkListener(address string, linkEventListener LinkEventListener) *Listener
func (l *Listener) GetLink(ID string) *Link

GetLink 获取连接

func (*Listener) OnStatus

func (l *Listener) OnStatus(link *Link, statusType StatusType)

func (*Listener) OnStream

func (l *Listener) OnStream(link *Link, payload Payload, dir StreamDir)

func (*Listener) Start

func (l *Listener) Start() error

func (*Listener) Stop

func (l *Listener) Stop() error

Stop 停止监听.

type Payload

type Payload interface {
	Size() uint32
	Bytes() []byte
	Type() int8
	SetPkgIdx(i uint64)
	GetPkgIdx() uint64
	Ack(data []byte) Payload
}

func DecodePayLoad

func DecodePayLoad(data []byte) (Payload, error)

func NewBytesPayload

func NewBytesPayload(t int8, data []byte) (Payload, error)

NewBytesPayload 构建一个 []byte 荷载

type StatusType

type StatusType byte

StatusType 状态类型

const (
	// StatusInit 初始(Conn已实例化)
	StatusInit StatusType = iota
	// StatusError 错误(有异常)
	StatusError
	// StatusReady 就绪(可用)
	StatusReady
	// StatusClosed 已关闭
	StatusClosed
)

func (StatusType) String

func (s StatusType) String() string

StatusTypeString 状态(s)转字串

type StreamDir

type StreamDir byte

StreamDir Stream方向类型

const (
	// StreamIncoming Incoming
	StreamIncoming StreamDir = iota
	// StreamOutgoing Outgoing
	StreamOutgoing
)

func (StreamDir) String

func (dir StreamDir) String() string

StreamDirString streamDir 转 string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL