Documentation
¶
Overview ¶
pkg/transports/tcp/connection.go
pkg/transports/tcp/connection_context.go
pkg/transports/tcp/multiplexing_handler.go
pkg/transports/tcp/state.go
pkg/transports/tcp/test_helpers.go
Index ¶
- func CreateRequestMessage(protocolType types.ProtocolType, payload []byte) []byte
- func CreateResponseMessage(payload []byte) []byte
- func GetFreePort() (int, error)
- func GetFreePortForTest(t testing.TB) int
- type ChunkedMessageInfo
- type Connection
- type ConnectionContext
- type MultiplexingTrafficHandler
- type OnCloseHandlerFn
- type OnTrafficHandlerFn
- type Server
- func (s *Server) Addr() string
- func (s *Server) DeregisterHandler(protocolType types.HandlerType)
- func (s *Server) OnBoot(eng gnet.Engine) (action gnet.Action)
- func (s *Server) OnClose(c gnet.Conn, err error) (action gnet.Action)
- func (s *Server) OnOpen(c gnet.Conn) (out []byte, action gnet.Action)
- func (s *Server) OnShutdown(eng gnet.Engine)
- func (s *Server) OnTick() (delay time.Duration, action gnet.Action)
- func (s *Server) OnTraffic(c gnet.Conn) (action gnet.Action)
- func (s *Server) RegisterHandler(protocolType types.HandlerType, handler transports.Handler)
- func (s *Server) SetOnCloseHandler(handler OnCloseHandlerFn)
- func (s *Server) SetOnTrafficHandler(handler OnTrafficHandlerFn)
- func (s *Server) Start(ctx context.Context) error
- func (s *Server) Stop() error
- func (s *Server) WaitStarted() <-chan struct{}
- type State
- type StateManager
- type StateType
- type TrafficHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateRequestMessage ¶
func CreateRequestMessage(protocolType types.ProtocolType, payload []byte) []byte
CreateRequestMessage Helper function to create a length-prefixed message with ProtocolType (for requests).
func CreateResponseMessage ¶
CreateResponseMessage Helper function to create a length-prefixed message without ProtocolType (for responses).
func GetFreePort ¶
GetFreePort attempts to find an available port and confirm that it's truly available.
func GetFreePortForTest ¶
GetFreePortForTest attempts to find an available port and confirm that it's truly available.
Types ¶
type ChunkedMessageInfo ¶
type ChunkedMessageInfo struct {
TotalSize uint32 // Total expected message size
CurrentSize uint32 // Current bytes received
Buffer *bytes.Buffer // Buffer for building the complete message
}
ChunkedMessageInfo stores state for reassembling large messages that exceed TCP packet size
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection implements transports.Connection for TCP.
func NewTCPConnection ¶
func NewTCPConnection(conn gnet.Conn) *Connection
NewTCPConnection creates a new TCPConnection.
func (*Connection) Conn ¶
func (c *Connection) Conn() gnet.Conn
func (*Connection) RemoteAddr ¶
func (c *Connection) RemoteAddr() string
RemoteAddr returns the remote address of the connection.
func (*Connection) Send ¶
func (c *Connection) Send(data []byte) error
Send sends data to the connection without additional framing. Since the custom OnTraffic handler handles raw HTTP, no length prefix or protocol type byte is needed.
func (*Connection) SendCustom ¶
func (c *Connection) SendCustom(data []byte) error
SendCustom sends data to the connection with a length prefix.
func (*Connection) SendWithCallback ¶
func (c *Connection) SendWithCallback(data []byte, callback gnet.AsyncCallback) error
type ConnectionContext ¶
type ConnectionContext struct {
Buffer []byte
BytesReader *bytes.Reader
BufioReader *bufio.Reader
Conn transports.Connection
WSUpgraded bool // Indicates if the connection has been upgraded to WebSocket.
ProtocolState any // Holds protocol-specific state (e.g., WebSocket codec).
Protocol string // Tracks the protocol ("http", "websocket").
Ctx context.Context // Context for this connection.
Cancel context.CancelFunc // Cancel function to cancel the context.
ChunkedMessageInfo *ChunkedMessageInfo // For handling large chunked messages
}
ConnectionContext holds the per-connection buffer and metadata.
type MultiplexingTrafficHandler ¶
type MultiplexingTrafficHandler struct {
// contains filtered or unexported fields
}
MultiplexingTrafficHandler multiplexes between different protocol handlers.
func NewMultiplexingTrafficHandler ¶
func NewMultiplexingTrafficHandler(httpHandler TrafficHandler, wsHandler TrafficHandler, logger logger.Logger) *MultiplexingTrafficHandler
func (*MultiplexingTrafficHandler) Handle ¶
func (m *MultiplexingTrafficHandler) Handle(ctx *ConnectionContext, conn gnet.Conn) gnet.Action
func (*MultiplexingTrafficHandler) OnClose ¶
func (m *MultiplexingTrafficHandler) OnClose(ctx *ConnectionContext, conn gnet.Conn)
OnClose delegates the OnClose call to the appropriate protocol handler.
type OnCloseHandlerFn ¶
type OnCloseHandlerFn func(ctx *ConnectionContext, c gnet.Conn)
type OnTrafficHandlerFn ¶
type OnTrafficHandlerFn func(ctx *ConnectionContext, c gnet.Conn) (action gnet.Action)
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server represents the TCP server.
func NewServer ¶
func NewServer(ctx context.Context, cnf config.TcpTransport, logger logger.Logger, obs *observability.Observability) (*Server, error)
NewServer creates a new Server instance with the provided configuration.
func SetupServerTest ¶
func SetupServerTest(t testing.TB, ctx context.Context, cfg config.TcpTransport) (logger.Logger, *observability.Observability, *Server)
SetupServerTest sets up the TCP server with a dynamic port.
func (*Server) DeregisterHandler ¶
func (s *Server) DeregisterHandler(protocolType types.HandlerType)
DeregisterHandler removes a handler for a specific protocol type.
func (*Server) OnShutdown ¶
OnShutdown is called when the server is shutting down.
func (*Server) RegisterHandler ¶
func (s *Server) RegisterHandler(protocolType types.HandlerType, handler transports.Handler)
RegisterHandler registers a handler for a specific protocol type.
func (*Server) SetOnCloseHandler ¶
func (s *Server) SetOnCloseHandler(handler OnCloseHandlerFn)
func (*Server) SetOnTrafficHandler ¶
func (s *Server) SetOnTrafficHandler(handler OnTrafficHandlerFn)
func (*Server) WaitStarted ¶
func (s *Server) WaitStarted() <-chan struct{}
WaitStarted returns a channel that is closed when the server starts.
type State ¶
type State int
State represents the current state of a particular node component type.
const ( Uninitialized State = iota // The service has not been initialized yet. Initializing // The service is currently in the process of initialization. Initialized // The service has been successfully initialized. Starting // The service is in the process of starting. Started // The service has started successfully and is running. Failed // The service failed to initialize or start. Stopping Stopped // The service has been stopped. )
Define various states a service or component can be in.
type StateManager ¶
type StateManager struct {
// contains filtered or unexported fields
}
StateManager manages and tracks the state of services or components.
func NewStateManager ¶
func NewStateManager(logger logger.Logger, obs *observability.Observability) *StateManager
NewStateManager creates a new StateManager instance with the provided logger and observability.
func (*StateManager) GetState ¶
func (sm *StateManager) GetState(stateType StateType) State
GetState retrieves the current state of a specified service or component.
func (*StateManager) SetState ¶
func (sm *StateManager) SetState(stateType StateType, state State)
SetState updates the state of a service or component, logs the change, and emits a metric if applicable.
func (*StateManager) WaitForState ¶
func (sm *StateManager) WaitForState(stateType StateType, desiredState State, timeout time.Duration) error
WaitForState waits for a service or component to reach a desired state within a given timeout period. Returns an error if the timeout expires before the desired state is reached.
type StateType ¶
type StateType string
StateType represents a type of service or component state that the StateManager tracks.
const (
ServerStateType StateType = "server"
)
Define service and component state types.
type TrafficHandler ¶
type TrafficHandler interface {
Handle(ctx *ConnectionContext, conn gnet.Conn) gnet.Action
OnClose(ctx *ConnectionContext, conn gnet.Conn)
}
TrafficHandler is an interface that both HTTPHandler and WebSocketHandler implement.