conn

package module
v1.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 13, 2025 License: BSD-3-Clause Imports: 5 Imported by: 0

README

Conn Package

Go Reference License

A simple and efficient TCP connection data exchange utility for Go applications.

Features

  • Thread-safe, bidirectional data exchange between two TCP connections
  • Idle timeout support for automatic connection cleanup
  • Efficient error handling and resource management
  • TimeoutReader for per-read timeout control
  • StatConn for connection statistics tracking (RX/TX bytes)
  • RateLimiter for bandwidth control using token bucket algorithm
  • Global rate limiting with separate read/write speed controls

Installation

go get github.com/NodePassProject/conn

Usage

TCP to TCP Data Exchange
package main

import (
    "fmt"
    "net"
    "time"
    "io"
    "sync/atomic"

    "github.com/NodePassProject/conn"
)

func main() {
    // Example with two TCP connections
    conn1, err := net.Dial("tcp", "server1.example.com:8080")
    if err != nil {
        fmt.Printf("Failed to connect to server1: %v\n", err)
        return
    }
    defer conn1.Close()

    conn2, err := net.Dial("tcp", "server2.example.com:9090")
    if err != nil {
        fmt.Printf("Failed to connect to server2: %v\n", err)
        return
    }
    defer conn2.Close()

    // Optional: Create rate limiter (1MB/s read, 512KB/s write)
    rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)

    // Optional: Wrap connections with StatConn for statistics and rate limiting
    var rx1, tx1, rx2, tx2 uint64
    statConn1 := &conn.StatConn{Conn: conn1, RX: &rx1, TX: &tx1, Rate: rateLimiter}
    statConn2 := &conn.StatConn{Conn: conn2, RX: &rx2, TX: &tx2, Rate: rateLimiter}

    // Exchange data between the two connections with a 5-second idle timeout
    bytesAtoB, bytesBtoA, err := conn.DataExchange(statConn1, statConn2, 5*time.Second)
    if err != nil && err != io.EOF {
        fmt.Printf("Data exchange error: %v\n", err)
    }

    fmt.Printf("Transferred %d bytes from server1 to server2\n", bytesAtoB)
    fmt.Printf("Transferred %d bytes from server2 to server1\n", bytesBtoA)
    fmt.Printf("Total RX: %d bytes, Total TX: %d bytes\n", 
        atomic.LoadUint64(&rx1)+atomic.LoadUint64(&rx2),
        atomic.LoadUint64(&tx1)+atomic.LoadUint64(&tx2))
}
TimeoutReader

TimeoutReader is a wrapper for net.Conn that allows you to set a read timeout for each read operation. It is used internally by DataExchange, but can also be used directly if needed:

import "github.com/NodePassProject/conn"

tr := &conn.TimeoutReader{Conn: tcpConn, Timeout: 5 * time.Second}
buf := make([]byte, 4096)
n, err := tr.Read(buf)
StatConn

StatConn is a wrapper for net.Conn that tracks connection statistics (received and transmitted bytes) and supports optional rate limiting. It implements the net.Conn interface and can be used as a drop-in replacement:

import (
    "sync/atomic"
    "github.com/NodePassProject/conn"
)

// Basic usage without rate limiting
var rxBytes, txBytes uint64
statConn := &conn.StatConn{
    Conn: tcpConn,
    RX:   &rxBytes,
    TX:   &txBytes,
}

// Usage with rate limiting (1MB/s read, 512KB/s write)
rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)
statConnWithLimit := &conn.StatConn{
    Conn: tcpConn,
    RX:   &rxBytes,
    TX:   &txBytes,
    Rate: rateLimiter, // Enable rate limiting
}

// Use statConn like a normal net.Conn
// The rxBytes and txBytes variables will be updated automatically
// Rate limiting is applied automatically if Rate is set
n, err := statConnWithLimit.Write(data)
fmt.Printf("Total bytes sent: %d\n", atomic.LoadUint64(&txBytes))
fmt.Printf("Total bytes received: %d\n", atomic.LoadUint64(&rxBytes))
RateLimiter

RateLimiter implements a token bucket algorithm for bandwidth control. It supports separate rate limiting for read and write operations:

import "github.com/NodePassProject/conn"

// Create a rate limiter with 1MB/s read and 512KB/s write limits
rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)

// Use with StatConn for automatic rate limiting
var rxBytes, txBytes uint64
statConn := &conn.StatConn{
    Conn: tcpConn,
    RX:   &rxBytes,
    TX:   &txBytes,
    Rate: rateLimiter, // Enable rate limiting
}

// All read/write operations will be automatically rate limited
data := make([]byte, 4096)
n, err := statConn.Read(data)  // Automatically applies read rate limit
n, err = statConn.Write(data)  // Automatically applies write rate limit

You can also use the rate limiter directly:

rateLimiter := conn.NewRateLimiter(1024*1024, 512*1024)

// Manual rate limiting
dataSize := int64(len(data))
rateLimiter.WaitWrite(dataSize)  // Wait for write tokens
n, err := conn.Write(data)

rateLimiter.WaitRead(int64(n))   // Wait for read tokens (if needed)

Rate Limiter Features:

  • Token bucket algorithm for smooth traffic shaping
  • Separate read and write rate controls
  • Thread-safe implementation using atomic operations
  • Zero value means unlimited rate (set to 0 or negative values)
  • Automatic token refill based on configured rates

License

Copyright (c) 2025, NodePassProject. Licensed under the BSD 3-Clause License. See the LICENSE file for details.

Documentation

Overview

Package conn 对 net.Conn 的扩展,包括超时读取、限速统计和双向数据交换等功能

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DataExchange

func DataExchange(conn1, conn2 net.Conn, idleTimeout time.Duration) (int64, int64, error)

DataExchange 实现两个 net.Conn 之间的双向数据交换,支持空闲超时

Types

type RateLimiter added in v1.0.5

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter 全局令牌桶读写限速器

func NewRateLimiter added in v1.0.5

func NewRateLimiter(readBytesPerSecond, writeBytesPerSecond int64) *RateLimiter

NewRateLimiter 创建新的全局令牌桶读写限速器

func (*RateLimiter) Reset added in v1.0.8

func (rl *RateLimiter) Reset()

Reset 重置限速器状态

func (*RateLimiter) SetRate added in v1.0.8

func (rl *RateLimiter) SetRate(readBytesPerSecond, writeBytesPerSecond int64)

SetRate 动态调整读写速率

func (*RateLimiter) WaitRead added in v1.0.5

func (rl *RateLimiter) WaitRead(bytes int64)

WaitRead 等待读取令牌

func (*RateLimiter) WaitWrite added in v1.0.5

func (rl *RateLimiter) WaitWrite(bytes int64)

WaitWrite 等待写入令牌

type StatConn added in v1.0.4

type StatConn struct {
	Conn net.Conn
	RX   *uint64
	TX   *uint64
	Rate *RateLimiter
}

StatConn 是一个包装了 net.Conn 的结构体,用于统计并限制读取和写入的字节数

func NewStatConn added in v1.0.8

func NewStatConn(conn net.Conn, rx, tx *uint64, rate *RateLimiter) *StatConn

NewStatConn 创建一个新的 StatConn

func (*StatConn) Close added in v1.0.4

func (sc *StatConn) Close() error

Close 关闭连接

func (*StatConn) GetConn added in v1.0.8

func (sc *StatConn) GetConn() net.Conn

GetConn 返回底层的 net.Conn

func (*StatConn) GetRX added in v1.0.8

func (sc *StatConn) GetRX() uint64

GetRX 返回已接收的字节数

func (*StatConn) GetRate added in v1.0.8

func (sc *StatConn) GetRate() *RateLimiter

GetRate 返回当前的限速

func (*StatConn) GetTX added in v1.0.8

func (sc *StatConn) GetTX() uint64

GetTX 返回已发送的字节数

func (*StatConn) GetTotal added in v1.0.8

func (sc *StatConn) GetTotal() uint64

GetTotal 返回总的传输字节数

func (*StatConn) LocalAddr added in v1.0.4

func (sc *StatConn) LocalAddr() net.Addr

LocalAddr 返回本地地址

func (*StatConn) Read added in v1.0.4

func (sc *StatConn) Read(b []byte) (int, error)

Read 实现了 io.Reader 接口,读取数据时会统计读取字节数并进行限速

func (*StatConn) RemoteAddr added in v1.0.4

func (sc *StatConn) RemoteAddr() net.Addr

RemoteAddr 返回远程地址

func (*StatConn) Reset added in v1.0.8

func (sc *StatConn) Reset()

Reset 重置统计数据

func (*StatConn) SetDeadline added in v1.0.4

func (sc *StatConn) SetDeadline(t time.Time) error

SetDeadline 设置连接的读写超时

func (*StatConn) SetReadDeadline added in v1.0.4

func (sc *StatConn) SetReadDeadline(t time.Time) error

SetReadDeadline 设置连接的读取超时

func (*StatConn) SetWriteDeadline added in v1.0.4

func (sc *StatConn) SetWriteDeadline(t time.Time) error

SetWriteDeadline 设置连接的写入超时

func (*StatConn) Write added in v1.0.4

func (sc *StatConn) Write(b []byte) (int, error)

Write 实现了 io.Writer 接口,写入数据时会统计写入字节数并进行限速

type TimeoutReader added in v1.0.2

type TimeoutReader struct {
	Conn    net.Conn
	Timeout time.Duration
}

TimeoutReader 包装了 net.Conn,支持设置读取超时

func (*TimeoutReader) Read added in v1.0.2

func (tr *TimeoutReader) Read(b []byte) (int, error)

Read 实现了 io.Reader 接口,读取数据时会设置读取超时

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL