conn

package module
v1.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 10, 2025 License: BSD-3-Clause Imports: 6 Imported by: 0

README

Conn Package

Go Reference License

A comprehensive Go package for network connection management with advanced features including statistics tracking, rate limiting, and protocol-specific operations.

Features

  • Thread-safe, bidirectional data exchange between network connections
  • Idle timeout support for automatic connection cleanup
  • Efficient error handling and resource management
  • TimeoutReader for per-read timeout control
  • StatConn for connection statistics tracking (RX/TX bytes) with protocol-specific methods
  • RateLimiter for bandwidth control using token bucket algorithm
  • Global rate limiting with separate read/write speed controls
  • TCP-specific methods (KeepAlive, NoDelay, Linger, CloseRead/Write)
  • UDP-specific methods (ReadFromUDP, WriteToUDP, Buffer control)
  • Connection type detection and safe type conversion

Installation

go get github.com/NodePassProject/conn

Usage

TCP to TCP Data Exchange
package main

import (
    "fmt"
    "net"
    "time"
    "github.com/NodePassProject/conn"
)

func main() {
    // Example with two TCP connections
    conn1, err := net.Dial("tcp", "server1.example.com:8080")
    if err != nil {
        fmt.Printf("Failed to connect to server1: %v\n", err)
        return
    }
    defer conn1.Close()

    conn2, err := net.Dial("tcp", "server2.example.com:9090")
    if err != nil {
        fmt.Printf("Failed to connect to server2: %v\n", err)
        return
    }
    defer conn2.Close()

    // Optional: Create rate limiter (1MB/s read, 512KB/s write)
    rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)

    // Optional: Wrap connections with StatConn for statistics and rate limiting
    var rx1, tx1, rx2, tx2 uint64
    statConn1 := conn.NewStatConn(conn1, &rx1, &tx1, rateLimiter)
    statConn2 := conn.NewStatConn(conn2, &rx2, &tx2, rateLimiter)

    // Configure TCP-specific options if needed
    if statConn1.IsTCP() {
        statConn1.SetKeepAlive(true)
        statConn1.SetKeepAlivePeriod(30 * time.Second)
        statConn1.SetNoDelay(true) // Disable Nagle algorithm for low latency
    }

    // Exchange data between the two connections with a 5-second idle timeout
    err = conn.DataExchange(statConn1, statConn2, 5*time.Second)
    if err != nil && err.Error() != "EOF" {
        fmt.Printf("Data exchange error: %v\n", err)
    }
    
    // Print statistics
    fmt.Printf("Conn1 - RX: %d bytes, TX: %d bytes\n", statConn1.GetRX(), statConn1.GetTX())
    fmt.Printf("Conn2 - RX: %d bytes, TX: %d bytes\n", statConn2.GetRX(), statConn2.GetTX())
}
TimeoutReader

TimeoutReader is a wrapper for net.Conn that allows you to set a read timeout for each read operation. It is used internally by DataExchange, but can also be used directly if needed:

import "github.com/NodePassProject/conn"

tr := &conn.TimeoutReader{Conn: tcpConn, Timeout: 5 * time.Second}
buf := make([]byte, 4096)
n, err := tr.Read(buf)
StatConn

StatConn is a wrapper for net.Conn that tracks connection statistics (received and transmitted bytes) and supports optional rate limiting. It implements the net.Conn interface and can be used as a drop-in replacement. It also provides protocol-specific methods for TCP and UDP connections.

import (
    "sync/atomic"
    "github.com/NodePassProject/conn"
)

// Basic usage without rate limiting
var rxBytes, txBytes uint64
statConn := conn.NewStatConn(tcpConn, &rxBytes, &txBytes, nil)

// Usage with rate limiting (1MB/s read, 512KB/s write)
rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)
statConnWithLimit := conn.NewStatConn(tcpConn, &rxBytes, &txBytes, rateLimiter)

// Use statConn like a normal net.Conn
// The rxBytes and txBytes variables will be updated automatically
// Rate limiting is applied automatically if Rate is set
n, err := statConnWithLimit.Write(data)
fmt.Printf("Total bytes sent: %d\n", statConn.GetTX())
fmt.Printf("Total bytes received: %d\n", statConn.GetRX())
TCP-Specific Methods

When the underlying connection is a TCP connection, you can use these specialized methods:

// Check if it's a TCP connection
if statConn.IsTCP() {
    // Configure TCP-specific options
    err := statConn.SetKeepAlive(true)
    err = statConn.SetKeepAlivePeriod(30 * time.Second)
    err = statConn.SetNoDelay(true)  // Disable Nagle algorithm
    err = statConn.SetLinger(10)     // Set linger timeout
    
    // Graceful shutdown
    err = statConn.CloseWrite()  // Close write end
    err = statConn.CloseRead()   // Close read end
}

// Safe type conversion
if tcpConn, ok := statConn.AsTCPConn(); ok {
    // Direct access to *net.TCPConn if needed
    _ = tcpConn
}
UDP-Specific Methods

When the underlying connection is a UDP connection, you can use these specialized methods:

// Check if it's a UDP connection
if statConn.IsUDP() {
    // UDP-specific read/write with address information
    buffer := make([]byte, 1024)
    n, addr, err := statConn.ReadFromUDP(buffer)
    
    // Send to specific address
    n, err = statConn.WriteToUDP(data, remoteAddr)
    
    // Configure UDP buffer sizes
    err = statConn.SetReadBuffer(65536)
    err = statConn.SetWriteBuffer(65536)
    
    // Advanced UDP operations with out-of-band data
    oob := make([]byte, 256)
    n, oobn, flags, addr, err := statConn.ReadMsgUDP(buffer, oob)
    n, oobn, err = statConn.WriteMsgUDP(data, oob, remoteAddr)
}

// Safe type conversion
if udpConn, ok := statConn.AsUDPConn(); ok {
    // Direct access to *net.UDPConn if needed
    _ = udpConn
}
Connection Type Detection
// Check connection type
fmt.Printf("Network type: %s\n", statConn.NetworkType()) // "tcp", "udp", or "unknown"

// Type-specific checks
if statConn.IsTCP() {
    fmt.Println("This is a TCP connection")
}
if statConn.IsUDP() {
    fmt.Println("This is a UDP connection")
}

StatConn Features:

  • Automatic statistics tracking for all read/write operations
  • Optional rate limiting integration
  • Protocol-specific method access with type safety
  • Safe type conversion methods
  • Connection type detection utilities
  • All UDP methods include automatic statistics and rate limiting
RateLimiter

RateLimiter implements a token bucket algorithm for bandwidth control. It supports separate rate limiting for read and write operations:

import "github.com/NodePassProject/conn"

// Create a rate limiter with 1MB/s read and 512KB/s write limits
rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)

// Use with StatConn for automatic rate limiting
var rxBytes, txBytes uint64
statConn := conn.NewStatConn(tcpConn, &rxBytes, &txBytes, rateLimiter)

// All read/write operations will be automatically rate limited
data := make([]byte, 4096)
n, err := statConn.Read(data)  // Automatically applies read rate limit
n, err = statConn.Write(data)  // Automatically applies write rate limit

You can also use the rate limiter directly:

rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)

// Manual rate limiting
dataSize := int64(len(data))
rateLimiter.WaitWrite(dataSize)  // Wait for write tokens
n, err := conn.Write(data)

rateLimiter.WaitRead(int64(n))   // Wait for read tokens (if needed)

Rate Limiter Features:

  • Token bucket algorithm for smooth traffic shaping
  • Separate read and write rate controls
  • Thread-safe implementation using atomic operations
  • Zero value means unlimited rate (set to 0 or negative values)
  • Automatic token refill based on configured rates

Complete Examples

TCP Proxy with Statistics and Rate Limiting
package main

import (
    "fmt"
    "log"
    "net"
    "time"
    "github.com/NodePassProject/conn"
)

func handleConnection(clientConn net.Conn) {
    defer clientConn.Close()
    
    // Connect to target server
    serverConn, err := net.Dial("tcp", "target-server.com:80")
    if err != nil {
        log.Printf("Failed to connect to server: %v", err)
        return
    }
    defer serverConn.Close()
    
    // Create rate limiter (10MB/s read, 5MB/s write)
    rateLimiter := conn.NewRateLimiter(10*1024*1024, 5*1024*1024)
    
    // Wrap connections with StatConn
    var clientRX, clientTX, serverRX, serverTX uint64
    statClient := conn.NewStatConn(clientConn, &clientRX, &clientTX, rateLimiter)
    statServer := conn.NewStatConn(serverConn, &serverRX, &serverTX, rateLimiter)
    
    // Configure TCP options for better performance
    if statClient.IsTCP() {
        statClient.SetKeepAlive(true)
        statClient.SetKeepAlivePeriod(30 * time.Second)
        statClient.SetNoDelay(true)
    }
    if statServer.IsTCP() {
        statServer.SetKeepAlive(true)
        statServer.SetKeepAlivePeriod(30 * time.Second)
        statServer.SetNoDelay(true)
    }
    
    // Start data exchange with 60-second idle timeout
    start := time.Now()
    err = conn.DataExchange(statClient, statServer, 60*time.Second)
    duration := time.Since(start)
    
    // Log statistics
    totalBytes := statClient.GetTotal() + statServer.GetTotal()
    avgSpeed := float64(totalBytes) / duration.Seconds() / 1024 / 1024 // MB/s
    
    log.Printf("Connection closed - Duration: %v, Total: %d bytes, Avg Speed: %.2f MB/s",
        duration, totalBytes, avgSpeed)
    
    if err != nil && err.Error() != "EOF" {
        log.Printf("Data exchange error: %v", err)
    }
}

func main() {
    listener, err := net.Listen("tcp", ":8080")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    defer listener.Close()
    
    log.Println("TCP proxy listening on :8080")
    
    for {
        clientConn, err := listener.Accept()
        if err != nil {
            log.Printf("Failed to accept connection: %v", err)
            continue
        }
        
        go handleConnection(clientConn)
    }
}
UDP Echo Server with Buffer Management
package main

import (
    "fmt"
    "log"
    "net"
    "github.com/NodePassProject/conn"
)

func main() {
    // Listen on UDP port
    udpAddr, err := net.ResolveUDPAddr("udp", ":8081")
    if err != nil {
        log.Fatalf("Failed to resolve UDP address: %v", err)
    }
    
    udpConn, err := net.ListenUDP("udp", udpAddr)
    if err != nil {
        log.Fatalf("Failed to listen on UDP: %v", err)
    }
    defer udpConn.Close()
    
    // Create rate limiter for UDP (1MB/s each direction)
    rateLimiter := conn.NewRateLimiter(1024*1024, 1024*1024)
    
    // Wrap with StatConn
    var rxBytes, txBytes uint64
    statConn := conn.NewStatConn(udpConn, &rxBytes, &txBytes, rateLimiter)
    
    // Configure UDP buffer sizes for better performance
    if statConn.IsUDP() {
        statConn.SetReadBuffer(65536)
        statConn.SetWriteBuffer(65536)
    }
    
    log.Println("UDP echo server listening on :8081")
    
    buffer := make([]byte, 1024)
    for {
        // Read from UDP with automatic statistics and rate limiting
        n, clientAddr, err := statConn.ReadFromUDP(buffer)
        if err != nil {
            log.Printf("Failed to read from UDP: %v", err)
            continue
        }
        
        message := string(buffer[:n])
        log.Printf("Received from %v: %s", clientAddr, message)
        
        // Echo back to client with automatic statistics and rate limiting
        response := fmt.Sprintf("Echo: %s", message)
        _, err = statConn.WriteToUDP([]byte(response), clientAddr)
        if err != nil {
            log.Printf("Failed to write to UDP: %v", err)
            continue
        }
        
        // Print statistics periodically
        if statConn.GetRX()%10240 == 0 { // Every ~10KB
            log.Printf("Stats - RX: %d bytes, TX: %d bytes, Total: %d bytes",
                statConn.GetRX(), statConn.GetTX(), statConn.GetTotal())
        }
    }
}

License

Copyright (c) 2025, NodePassProject. Licensed under the BSD 3-Clause License. See the LICENSE file for details.

Documentation

Overview

Package conn 对 net.Conn 的扩展,包括超时读取、限速统计和双向数据交换等功能

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotTCPConn = errors.New("not a TCP connection")
	ErrNotUDPConn = errors.New("not a UDP connection")
)

定义错误

Functions

func DataExchange

func DataExchange(conn1, conn2 net.Conn, idleTimeout time.Duration) error

DataExchange 实现两个 net.Conn 之间的双向数据交换,支持空闲超时

Types

type RateLimiter added in v1.0.5

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter 全局令牌桶读写限速器

func NewRateLimiter added in v1.0.5

func NewRateLimiter(readBytesPerSecond, writeBytesPerSecond int64) *RateLimiter

NewRateLimiter 创建新的全局令牌桶读写限速器

func (*RateLimiter) Reset added in v1.0.9

func (rl *RateLimiter) Reset()

Reset 重置限速器状态

func (*RateLimiter) SetRate added in v1.0.9

func (rl *RateLimiter) SetRate(readBytesPerSecond, writeBytesPerSecond int64)

SetRate 动态调整读写速率

func (*RateLimiter) WaitRead added in v1.0.5

func (rl *RateLimiter) WaitRead(bytes int64)

WaitRead 等待读取令牌

func (*RateLimiter) WaitWrite added in v1.0.5

func (rl *RateLimiter) WaitWrite(bytes int64)

WaitWrite 等待写入令牌

type StatConn added in v1.0.4

type StatConn struct {
	Conn net.Conn
	RX   *uint64
	TX   *uint64
	Rate *RateLimiter
}

StatConn 是一个包装了 net.Conn 的结构体,用于统计并限制读取和写入的字节数

func NewStatConn added in v1.0.9

func NewStatConn(conn net.Conn, rx, tx *uint64, rate *RateLimiter) *StatConn

NewStatConn 创建一个新的 StatConn

func (*StatConn) AsTCPConn added in v1.0.12

func (sc *StatConn) AsTCPConn() (*net.TCPConn, bool)

AsTCPConn 安全地将底层连接转换为 *net.TCPConn

func (*StatConn) AsUDPConn added in v1.0.12

func (sc *StatConn) AsUDPConn() (*net.UDPConn, bool)

AsUDPConn 安全地将底层连接转换为 *net.UDPConn

func (*StatConn) Close added in v1.0.4

func (sc *StatConn) Close() error

Close 关闭连接

func (*StatConn) CloseRead added in v1.0.12

func (sc *StatConn) CloseRead() error

CloseRead 关闭TCP连接的读取端

func (*StatConn) CloseWrite added in v1.0.12

func (sc *StatConn) CloseWrite() error

CloseWrite 关闭TCP连接的写入端

func (*StatConn) GetConn added in v1.0.9

func (sc *StatConn) GetConn() net.Conn

GetConn 返回底层的 net.Conn

func (*StatConn) GetRX added in v1.0.9

func (sc *StatConn) GetRX() uint64

GetRX 返回已接收的字节数

func (*StatConn) GetRate added in v1.0.9

func (sc *StatConn) GetRate() *RateLimiter

GetRate 返回当前的限速

func (*StatConn) GetTX added in v1.0.9

func (sc *StatConn) GetTX() uint64

GetTX 返回已发送的字节数

func (*StatConn) GetTotal added in v1.0.9

func (sc *StatConn) GetTotal() uint64

GetTotal 返回总的传输字节数

func (*StatConn) IsTCP added in v1.0.12

func (sc *StatConn) IsTCP() bool

IsTCP 检查底层连接是否为TCP连接

func (*StatConn) IsUDP added in v1.0.12

func (sc *StatConn) IsUDP() bool

IsUDP 检查底层连接是否为UDP连接

func (*StatConn) LocalAddr added in v1.0.4

func (sc *StatConn) LocalAddr() net.Addr

LocalAddr 返回本地地址

func (*StatConn) NetworkType added in v1.0.12

func (sc *StatConn) NetworkType() string

NetworkType 返回底层连接的网络类型

func (*StatConn) Read added in v1.0.4

func (sc *StatConn) Read(b []byte) (int, error)

Read 实现了 io.Reader 接口,读取数据时会统计读取字节数并进行限速

func (*StatConn) ReadFromUDP added in v1.0.12

func (sc *StatConn) ReadFromUDP(b []byte) (int, *net.UDPAddr, error)

ReadFromUDP 从UDP连接读取数据包,返回数据和发送方地址

func (*StatConn) ReadMsgUDP added in v1.0.12

func (sc *StatConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error)

ReadMsgUDP 从UDP连接读取消息,支持读取控制信息

func (*StatConn) RemoteAddr added in v1.0.4

func (sc *StatConn) RemoteAddr() net.Addr

RemoteAddr 返回远程地址

func (*StatConn) Reset added in v1.0.9

func (sc *StatConn) Reset()

Reset 重置统计数据

func (*StatConn) SetDeadline added in v1.0.4

func (sc *StatConn) SetDeadline(t time.Time) error

SetDeadline 设置连接的读写超时

func (*StatConn) SetKeepAlive added in v1.0.12

func (sc *StatConn) SetKeepAlive(keepalive bool) error

SetKeepAlive 设置TCP连接的KeepAlive状态

func (*StatConn) SetKeepAlivePeriod added in v1.0.12

func (sc *StatConn) SetKeepAlivePeriod(d time.Duration) error

SetKeepAlivePeriod 设置TCP连接的KeepAlive周期

func (*StatConn) SetLinger added in v1.0.12

func (sc *StatConn) SetLinger(sec int) error

SetLinger 设置TCP连接的Linger时间

func (*StatConn) SetNoDelay added in v1.0.12

func (sc *StatConn) SetNoDelay(noDelay bool) error

SetNoDelay 设置TCP连接的NoDelay状态(禁用/启用Nagle算法)

func (*StatConn) SetReadBuffer added in v1.0.12

func (sc *StatConn) SetReadBuffer(bytes int) error

SetReadBuffer 设置UDP连接的读取缓冲区大小

func (*StatConn) SetReadDeadline added in v1.0.4

func (sc *StatConn) SetReadDeadline(t time.Time) error

SetReadDeadline 设置连接的读取超时

func (*StatConn) SetWriteBuffer added in v1.0.12

func (sc *StatConn) SetWriteBuffer(bytes int) error

SetWriteBuffer 设置UDP连接的写入缓冲区大小

func (*StatConn) SetWriteDeadline added in v1.0.4

func (sc *StatConn) SetWriteDeadline(t time.Time) error

SetWriteDeadline 设置连接的写入超时

func (*StatConn) Write added in v1.0.4

func (sc *StatConn) Write(b []byte) (int, error)

Write 实现了 io.Writer 接口,写入数据时会统计写入字节数并进行限速

func (*StatConn) WriteMsgUDP added in v1.0.12

func (sc *StatConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error)

WriteMsgUDP 向UDP连接发送消息,支持发送控制信息

func (*StatConn) WriteToUDP added in v1.0.12

func (sc *StatConn) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error)

WriteToUDP 向指定UDP地址发送数据包

type TimeoutReader added in v1.0.2

type TimeoutReader struct {
	Conn    net.Conn
	Timeout time.Duration
}

TimeoutReader 包装了 net.Conn,支持设置读取超时

func (*TimeoutReader) Read added in v1.0.2

func (tr *TimeoutReader) Read(b []byte) (int, error)

Read 实现了 io.Reader 接口,读取数据时会设置读取超时

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL