transport

package
v0.0.0-...-2121d46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BatchLimit     = 100
	PingTimeOut    = time.Second * 30
	AckInterval    = time.Second * 5
	MaxTransPacket = 1048576
	MaxRetried     = 5
	HeadLen        = 4 //包头字节数
	ReadTimeout    = time.Second * 60
	WriteTimeout   = time.Second * 5
)
View Source
const (
	StatusConnecting = iota
	StatusConnected
	StatusDisconnected
)
View Source
const (
	TypeWorking = 1
	TypeStopped = 2
)
View Source
const (
	ZMinLen     = 200
	GzippedSize = 1
)
View Source
const (
	FrameRaw = iota
	FrameReplyRaw
	FrameStreamHeartbeat
	FrameStartStream
	FrameCleanStream

	FrameCliHalfClosedAck
)

Variables

This section is empty.

Functions

func BuildControlBuffer

func BuildControlBuffer(buf *ControlBuffer, max uint32)

func NewBodyPool

func NewBodyPool(size uint32)

Types

type Buffer

type Buffer struct {
	Bytes []byte
}

type ConnOption

type ConnOption struct {
	MaxIncomingPacket uint32
	StreamRecvBufSize int
	StreamHandler     func(st IServerTransport, s *Stream)
}

type ControlBuffer

type ControlBuffer struct {
	// contains filtered or unexported fields
}

func NewControlBuffer

func NewControlBuffer(max uint32, _sw *SenderWrapper) *ControlBuffer

func (*ControlBuffer) Kick

func (ins *ControlBuffer) Kick()

func (*ControlBuffer) OnClose

func (ins *ControlBuffer) OnClose()

OnClose TODO: Is it reasonable to reject stream input immediately?

func (*ControlBuffer) Run

func (ins *ControlBuffer) Run(_sw *SenderWrapper)

func (*ControlBuffer) Set

func (ins *ControlBuffer) Set(buf packet.IPacket) error

type DialOption

type DialOption struct {
	RemoteNodeId      string
	CurrentNodeId     string
	RemoteAddr        string
	MaxIncomingPacket uint32
	IsBlock           bool
	IsGzip            bool
	DisconnectHandler func(nodeId string)
}

type Frame

type Frame struct {
	Type     int8
	End      bool
	StreamId int64
	Data     packet.IPacket
}

type Framer

type Framer struct{}

func (*Framer) Decode

func (f *Framer) Decode(data packet.IPacket) (Frame, error)

func (*Framer) Encode

func (f *Framer) Encode(frame *Frame) packet.IPacket

type IClientTransport

type IClientTransport interface {
	Write(stream *Stream, pack packet.IPacket, isLast bool) error
	WriteData(s *Stream, data []byte, isLast bool) error
	Close(reason string) error
	NewStream(ctx context.Context, initialSize int) (*Stream, error)
	CloseStream(streamId int64)
}

func NewTcpClient

func NewTcpClient(_ops DialOption) IClientTransport

type IServerTransport

type IServerTransport interface {
	Write(stream *Stream, pack packet.IPacket) error
	WriteData(stream *Stream, data []byte) (err error)
	Close(reason string) error
	CloseStream(streamId int64)
}

type IUnboundedChan

type IUnboundedChan[V any] interface {
	Send(msg V) error
	Receive(consumer func(msg V) bool)
	Close()
}

type ReceiveBuf

type ReceiveBuf struct {
	// contains filtered or unexported fields
}

func NewReceiveBuf

func NewReceiveBuf(size int) *ReceiveBuf

func (*ReceiveBuf) OnClose

func (rb *ReceiveBuf) OnClose()

type SenderWrapper

type SenderWrapper struct {
	// contains filtered or unexported fields
}

func NewSender

func NewSender(sender func(body packet.IPacket) error) *SenderWrapper

func (*SenderWrapper) OnClose

func (ins *SenderWrapper) OnClose()

func (*SenderWrapper) Send

func (ins *SenderWrapper) Send(data packet.IPacket) error

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(_id int64, initialSize int, f context.Context, _ts *TcpServer, _tc *TcpClient) *Stream

func (*Stream) Close

func (s *Stream) Close()

func (*Stream) GetCtx

func (s *Stream) GetCtx() context.Context

func (*Stream) GetErr

func (s *Stream) GetErr() error

func (*Stream) Id

func (s *Stream) Id() int64

func (*Stream) OnClose

func (s *Stream) OnClose()

func (*Stream) OnElegantlyClose

func (s *Stream) OnElegantlyClose()

func (*Stream) Read

func (s *Stream) Read() (packet.IPacket, error)

type StreamMsg

type StreamMsg struct {
	// contains filtered or unexported fields
}

type StreamState

type StreamState uint32
const (
	StreamActive StreamState = iota
	StreamWriteDone
)

type Streams

type Streams struct {
	// contains filtered or unexported fields
}

func NewStreams

func NewStreams() *Streams

func (*Streams) Close

func (ins *Streams) Close(onClose func(stream *Stream))

func (*Streams) Del

func (ins *Streams) Del(id int64)

func (*Streams) Exist

func (ins *Streams) Exist(id int64) (exist bool)

func (*Streams) Get

func (ins *Streams) Get(id int64) (*Stream, bool)

func (*Streams) GetAndDel

func (ins *Streams) GetAndDel(id int64) (*Stream, bool)

func (*Streams) Len

func (ins *Streams) Len() int

func (*Streams) Range

func (ins *Streams) Range(iter func(stream *Stream))

func (*Streams) Reg

func (ins *Streams) Reg(id int64, s *Stream)

func (*Streams) StreamId

func (ins *Streams) StreamId() int64

type TcpClient

type TcpClient struct {
	// contains filtered or unexported fields
}

func (*TcpClient) Close

func (tc *TcpClient) Close(reason string) error

func (*TcpClient) CloseStream

func (tc *TcpClient) CloseStream(streamId int64)

func (*TcpClient) DialWithOps

func (tc *TcpClient) DialWithOps(ops DialOption)

func (*TcpClient) HandleData

func (tc *TcpClient) HandleData(in *Frame)

func (*TcpClient) NewStream

func (tc *TcpClient) NewStream(ctx context.Context, initialSize int) (*Stream, error)

func (*TcpClient) SendData

func (tc *TcpClient) SendData(data packet.IPacket) error

func (*TcpClient) StateCompareAndSwap

func (tc *TcpClient) StateCompareAndSwap(old, new uint32) bool

func (*TcpClient) Write

func (tc *TcpClient) Write(s *Stream, pack packet.IPacket, isLast bool) error

Write TcpClient obj does not implicitly call IPacket.Return to return the packet to the pool, and the user needs to explicitly call it.

func (*TcpClient) WriteData

func (tc *TcpClient) WriteData(s *Stream, data []byte, isLast bool) error

type TcpCodec

type TcpCodec struct {
	// contains filtered or unexported fields
}

TcpCodec TODO: 不支持压缩

func NewTcpCodec

func NewTcpCodec(max uint32, _isGzip bool) TcpCodec

func (TcpCodec) BlockDecode

func (tcd TcpCodec) BlockDecode(conn net.Conn, header, body []byte) (packet.IPacket, error)

func (TcpCodec) EncodeBody

func (tcd TcpCodec) EncodeBody(body packet.IPacket) packet.IPacket

EncodeBody 消息编码协议 body: size<int32> | gzipped<bool> | body<bytes>

type TcpServer

type TcpServer struct {
	// contains filtered or unexported fields
}

func NewTcpServer

func NewTcpServer(ctx context.Context, _conn net.Conn, ops *ConnOption) *TcpServer

func (*TcpServer) Close

func (ts *TcpServer) Close(_ string) error

func (*TcpServer) CloseStream

func (ts *TcpServer) CloseStream(streamId int64)

func (*TcpServer) HandleData

func (ts *TcpServer) HandleData(in *Frame)

func (*TcpServer) HandleLoop

func (ts *TcpServer) HandleLoop()

func (*TcpServer) OnData

func (ts *TcpServer) OnData(data packet.IPacket) error

func (*TcpServer) SendData

func (ts *TcpServer) SendData(body packet.IPacket) error

SendData implicitly call body.Return coding: size<int32> | gzipped<bool> | body<bytes>

func (*TcpServer) Write

func (ts *TcpServer) Write(stream *Stream, pack packet.IPacket) (err error)

Write TcpServer obj does not implicitly call IPacket.Return to return the packet to the pool, and the user needs to explicitly call it.

func (*TcpServer) WriteData

func (ts *TcpServer) WriteData(stream *Stream, data []byte) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL