tcp

package
v0.0.0-...-da72ffe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2025 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

pkg/transports/tcp/connection.go

pkg/transports/tcp/connection_context.go

pkg/transports/tcp/multiplexing_handler.go

pkg/transports/tcp/state.go

pkg/transports/tcp/test_helpers.go

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateRequestMessage

func CreateRequestMessage(protocolType types.ProtocolType, payload []byte) []byte

CreateRequestMessage Helper function to create a length-prefixed message with ProtocolType (for requests).

func CreateResponseMessage

func CreateResponseMessage(payload []byte) []byte

CreateResponseMessage Helper function to create a length-prefixed message without ProtocolType (for responses).

func GetFreePort

func GetFreePort() (int, error)

GetFreePort attempts to find an available port and confirm that it's truly available.

func GetFreePortForTest

func GetFreePortForTest(t testing.TB) int

GetFreePortForTest attempts to find an available port and confirm that it's truly available.

Types

type ChunkedMessageInfo

type ChunkedMessageInfo struct {
	TotalSize   uint32        // Total expected message size
	CurrentSize uint32        // Current bytes received
	Buffer      *bytes.Buffer // Buffer for building the complete message
}

ChunkedMessageInfo stores state for reassembling large messages that exceed TCP packet size

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection implements transports.Connection for TCP.

func NewTCPConnection

func NewTCPConnection(conn gnet.Conn) *Connection

NewTCPConnection creates a new TCPConnection.

func (*Connection) Close

func (c *Connection) Close() error

Close closes the connection.

func (*Connection) Conn

func (c *Connection) Conn() gnet.Conn

func (*Connection) RemoteAddr

func (c *Connection) RemoteAddr() string

RemoteAddr returns the remote address of the connection.

func (*Connection) Send

func (c *Connection) Send(data []byte) error

Send sends data to the connection without additional framing. Since the custom OnTraffic handler handles raw HTTP, no length prefix or protocol type byte is needed.

func (*Connection) SendCustom

func (c *Connection) SendCustom(data []byte) error

SendCustom sends data to the connection with a length prefix.

func (*Connection) SendWithCallback

func (c *Connection) SendWithCallback(data []byte, callback gnet.AsyncCallback) error

type ConnectionContext

type ConnectionContext struct {
	Buffer             []byte
	BytesReader        *bytes.Reader
	BufioReader        *bufio.Reader
	Conn               transports.Connection
	WSUpgraded         bool                // Indicates if the connection has been upgraded to WebSocket.
	ProtocolState      any                 // Holds protocol-specific state (e.g., WebSocket codec).
	Protocol           string              // Tracks the protocol ("http", "websocket").
	Ctx                context.Context     // Context for this connection.
	Cancel             context.CancelFunc  // Cancel function to cancel the context.
	ChunkedMessageInfo *ChunkedMessageInfo // For handling large chunked messages
}

ConnectionContext holds the per-connection buffer and metadata.

type MultiplexingTrafficHandler

type MultiplexingTrafficHandler struct {
	// contains filtered or unexported fields
}

MultiplexingTrafficHandler multiplexes between different protocol handlers.

func NewMultiplexingTrafficHandler

func NewMultiplexingTrafficHandler(httpHandler TrafficHandler, wsHandler TrafficHandler, logger logger.Logger) *MultiplexingTrafficHandler

func (*MultiplexingTrafficHandler) Handle

func (*MultiplexingTrafficHandler) OnClose

func (m *MultiplexingTrafficHandler) OnClose(ctx *ConnectionContext, conn gnet.Conn)

OnClose delegates the OnClose call to the appropriate protocol handler.

type OnCloseHandlerFn

type OnCloseHandlerFn func(ctx *ConnectionContext, c gnet.Conn)

type OnTrafficHandlerFn

type OnTrafficHandlerFn func(ctx *ConnectionContext, c gnet.Conn) (action gnet.Action)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents the TCP server.

func NewServer

func NewServer(ctx context.Context, cnf config.TcpTransport, logger logger.Logger, obs *observability.Observability) (*Server, error)

NewServer creates a new Server instance with the provided configuration.

func SetupServerTest

SetupServerTest sets up the TCP server with a dynamic port.

func (*Server) Addr

func (s *Server) Addr() string

Addr returns the TCP address as a string.

func (*Server) DeregisterHandler

func (s *Server) DeregisterHandler(protocolType types.HandlerType)

DeregisterHandler removes a handler for a specific protocol type.

func (*Server) OnBoot

func (s *Server) OnBoot(eng gnet.Engine) (action gnet.Action)

OnBoot is called when the server starts.

func (*Server) OnClose

func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action)

OnClose is called when a connection is closed.

func (*Server) OnOpen

func (s *Server) OnOpen(c gnet.Conn) (out []byte, action gnet.Action)

OnOpen is called when a new connection is opened.

func (*Server) OnShutdown

func (s *Server) OnShutdown(eng gnet.Engine)

OnShutdown is called when the server is shutting down.

func (*Server) OnTick

func (s *Server) OnTick() (delay time.Duration, action gnet.Action)

OnTick is called periodically by gnet.

func (*Server) OnTraffic

func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action)

OnTraffic handles incoming data.

func (*Server) RegisterHandler

func (s *Server) RegisterHandler(protocolType types.HandlerType, handler transports.Handler)

RegisterHandler registers a handler for a specific protocol type.

func (*Server) SetOnCloseHandler

func (s *Server) SetOnCloseHandler(handler OnCloseHandlerFn)

func (*Server) SetOnTrafficHandler

func (s *Server) SetOnTrafficHandler(handler OnTrafficHandlerFn)

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start initiates the TCP server with retry mechanism.

func (*Server) Stop

func (s *Server) Stop() error

Stop gracefully stops the TCP server with retries.

func (*Server) WaitStarted

func (s *Server) WaitStarted() <-chan struct{}

WaitStarted returns a channel that is closed when the server starts.

type State

type State int

State represents the current state of a particular node component type.

const (
	Uninitialized State = iota // The service has not been initialized yet.
	Initializing               // The service is currently in the process of initialization.
	Initialized                // The service has been successfully initialized.
	Starting                   // The service is in the process of starting.
	Started                    // The service has started successfully and is running.
	Failed                     // The service failed to initialize or start.
	Stopping
	Stopped // The service has been stopped.
)

Define various states a service or component can be in.

func (State) String

func (s State) String() string

String returns a string representation of the service state.

type StateManager

type StateManager struct {
	// contains filtered or unexported fields
}

StateManager manages and tracks the state of services or components.

func NewStateManager

func NewStateManager(logger logger.Logger, obs *observability.Observability) *StateManager

NewStateManager creates a new StateManager instance with the provided logger and observability.

func (*StateManager) GetState

func (sm *StateManager) GetState(stateType StateType) State

GetState retrieves the current state of a specified service or component.

func (*StateManager) SetState

func (sm *StateManager) SetState(stateType StateType, state State)

SetState updates the state of a service or component, logs the change, and emits a metric if applicable.

func (*StateManager) WaitForState

func (sm *StateManager) WaitForState(stateType StateType, desiredState State, timeout time.Duration) error

WaitForState waits for a service or component to reach a desired state within a given timeout period. Returns an error if the timeout expires before the desired state is reached.

type StateType

type StateType string

StateType represents a type of service or component state that the StateManager tracks.

const (
	ServerStateType StateType = "server"
)

Define service and component state types.

func (StateType) String

func (s StateType) String() string

type TrafficHandler

type TrafficHandler interface {
	Handle(ctx *ConnectionContext, conn gnet.Conn) gnet.Action
	OnClose(ctx *ConnectionContext, conn gnet.Conn)
}

TrafficHandler is an interface that both HTTPHandler and WebSocketHandler implement.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL