nntppool

package module
v3.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 20 Imported by: 0

README

nntppool

Go Reference Go Report Card

High-performance, concurrent NNTP client library for Go with streaming support, automatic failover, and built-in metrics.

Features

  • Streaming API with memory-efficient BodyReader for large article downloads
  • Multi-provider support with automatic failover (Primary/Backup tiers)
  • SOCKS proxy support (socks4, socks4a, socks5) with authentication
  • Built-in metrics with real-time throughput monitoring
  • Automatic yEnc decoding with header callbacks for early metadata access
  • Connection lifecycle management (idle timeout, max lifetime, automatic rotation)
  • High concurrency with configurable inflight limits
  • Health checking with automatic provider recovery
  • TLS support for secure NNTPS connections
  • Context-aware for timeout and cancellation support

Installation

go get github.com/javi11/nntppool/v3

Quick Start

package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "os"

    "github.com/javi11/nntppool/v3"
)

func main() {
    ctx := context.Background()

    // Create provider configuration
    provider, err := nntppool.NewProvider(ctx, nntppool.ProviderConfig{
        Address:        "news.example.com:563",
        MaxConnections: 10,
        Auth: nntppool.Auth{
            Username: "user",
            Password: "pass",
        },
        TLSConfig: &tls.Config{},
    })
    if err != nil {
        panic(err)
    }
    defer provider.Close()

    // Create client (implements NNTPClient interface)
    client := nntppool.NewClient()
    err = client.AddProvider(provider, nntppool.ProviderPrimary)
    if err != nil {
        panic(err)
    }
    defer client.Close()

    // Download article body
    file, err := os.Create("output.bin")
    if err != nil {
        panic(err)
    }
    defer file.Close()

    if err := client.Body(ctx, "<message-id@example.com>", file); err != nil {
        panic(err)
    }

    fmt.Println("Article downloaded successfully!")
}

Note: The Client type implements the NNTPClient interface. See types.go for the complete interface definition.

Usage Examples

Basic Article Retrieval
// Simple download to io.Writer
err := client.Body(ctx, "<message-id>", outputFile)
Streaming with yEnc Headers

Get early access to yEnc metadata while streaming the body:

reader, err := client.BodyReader(ctx, "<message-id>")
if err != nil {
    log.Fatal(err)
}
defer reader.Close()

// Headers become available immediately after parsing (non-blocking)
headers := reader.YencHeaders()
if headers != nil {
    fmt.Printf("File: %s, Size: %d bytes\n", headers.FileName, headers.FileSize)
}

// Stream decoded body to file
_, err = io.Copy(outputFile, reader)
Multi-Segment Downloads

Write directly to file offsets for parallel multi-segment downloads:

// file must implement io.WriterAt (e.g., *os.File)
err := client.BodyAt(ctx, "<segment-message-id>", file)
Multi-Provider Setup

Configure primary and backup providers with automatic failover:

// Primary provider
primaryProvider, _ := nntppool.NewProvider(ctx, nntppool.ProviderConfig{
    Address:        "primary.news.com:563",
    MaxConnections: 10,
    Auth:           nntppool.Auth{Username: "user", Password: "pass"},
    TLSConfig:      &tls.Config{},
})

// Backup provider
backupProvider, _ := nntppool.NewProvider(ctx, nntppool.ProviderConfig{
    Address:        "backup.news.com:563",
    MaxConnections: 5,
    Auth:           nntppool.Auth{Username: "user2", Password: "pass2"},
    TLSConfig:      &tls.Config{},
})

// Add to client with tier priority
client := nntppool.NewClient()
err = client.AddProvider(primaryProvider, nntppool.ProviderPrimary)
if err != nil {
    panic(err)
}
err = client.AddProvider(backupProvider, nntppool.ProviderBackup)
if err != nil {
    panic(err)
}

// Client automatically tries primary first, falls back to backup on errors
SOCKS Proxy Configuration
config := nntppool.ProviderConfig{
    Address:        "news.example.com:119",
    MaxConnections: 10,
    ProxyURL:       "socks5://user:pass@proxy.example.com:1080",
    Auth:           nntppool.Auth{Username: "newsuser", Password: "newspass"},
}

provider, err := nntppool.NewProvider(ctx, config)
Metrics Monitoring

Monitor real-time throughput and connection status:

metrics := client.Metrics()
for host, m := range metrics {
    fmt.Printf("%s: %.2f MB/s (%d active connections)\n",
        host, m.ThroughputMB, m.ActiveConnections)
    fmt.Printf("  Total read: %d bytes, written: %d bytes\n",
        m.TotalBytesRead, m.TotalBytesWritten)
}
Speed Testing

Measure download performance across multiple articles:

articleIDs := []string{
    "<article1@example.com>",
    "<article2@example.com>",
    "<article3@example.com>",
}

stats, err := client.SpeedTest(ctx, articleIDs)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Speed: %.2f MB/s\n", stats.BytesPerSecond/1024/1024)
fmt.Printf("Success: %d, Failures: %d\n", stats.SuccessCount, stats.FailureCount)
fmt.Printf("Total: %d bytes in %v\n", stats.TotalBytes, stats.Duration)

Configuration Reference

ProviderConfig Options
Field Type Required Default Description
Address string - NNTP server address in host:port format
MaxConnections int - Maximum concurrent connections to maintain
InitialConnections int 0 Number of connections to create on startup (lazy if 0)
InflightPerConnection int 1 Maximum concurrent requests per connection
MaxConnIdleTime time.Duration 0 Auto-close connections idle longer than this duration
MaxConnLifetime time.Duration 0 Auto-rotate connections older than this duration
Auth Auth empty NNTP authentication credentials (Username, Password)
TLSConfig *tls.Config nil TLS configuration (nil = plaintext connection)
ConnFactory ConnFactory nil Custom connection factory (overrides proxy/direct dial)
ProxyURL string "" SOCKS proxy URL (used when ConnFactory is nil)
Connection Lifecycle

Control connection behavior with lifecycle options:

  • MaxConnIdleTime: Automatically close connections that have been idle (no requests) longer than the specified duration. Useful for preventing stale connections.
  • MaxConnLifetime: Automatically rotate connections that have been active longer than the specified duration, regardless of usage. Useful for load balancing and preventing resource exhaustion.
  • Both options are disabled by default (0 value = no limit).
Proxy URL Formats

When ConnFactory is nil, the ProxyURL field supports SOCKS proxies:

  • SOCKS5 without auth: socks5://proxy.host:1080
  • SOCKS5 with auth: socks5://user:pass@proxy.host:1080
  • SOCKS4: socks4://proxy.host:1080
  • SOCKS4a: socks4a://proxy.host:1080

Advanced Topics

TLS & Custom Connections
// TLS configuration for NNTPS (port 563)
config := nntppool.ProviderConfig{
    Address: "news.example.com:563",
    TLSConfig: &tls.Config{
        MinVersion: tls.VersionTLS12,
        ServerName: "news.example.com",
    },
}

// Custom connection factory for advanced dialing
config.ConnFactory = func(ctx context.Context) (net.Conn, error) {
    dialer := &net.Dialer{Timeout: 30 * time.Second}
    conn, err := dialer.DialContext(ctx, "tcp", "news.example.com:563")
    if err != nil {
        return nil, err
    }
    // Apply custom socket options, wrapping, etc.
    return conn, nil
}
Automatic Features

The library includes several automatic features that require no configuration:

  • Health Monitoring: Providers are automatically checked every 60 seconds using the DATE command. Dead providers are marked inactive and automatically recovered when they become responsive again.

  • yEnc Decoding: All article bodies are automatically detected and decoded if they contain yEnc encoding. Includes CRC32 validation and support for multi-part articles with part metadata tracking.

  • Error Handling: Status code interpretation follows NNTP standards:

    • 2xx status codes indicate success
    • 430 indicates article not found (triggers failover to backup provider)
    • Other status codes indicate server or connection errors

Performance Considerations

  • Connection Pooling: Each provider maintains an independent connection pool. Connections are created lazily up to MaxConnections and reused across requests.

  • Concurrency Control: Callers control concurrency externally using mechanisms like errgroup.SetLimit(), worker pools, or semaphores. Provider-level connection limits handle internal throttling via MaxConnections and InflightPerConnection settings.

  • Streaming: Use BodyReader() or BodyAt() to avoid buffering large articles in memory. The streaming API decodes yEnc on-the-fly and writes directly to the destination.

  • TCP Buffers: All connections are automatically optimized with 8MB read buffers and 1MB write buffers for high-speed downloads.

  • Sequential Readers with io.Pipe: When implementing a sequential reader (io.Reader that must consume segments in order) using io.Pipe, download segment data to a buffer first, then copy to the pipe. Direct pipe writes can cause connection deadlock:

    1. Workers hold connections while blocked on pipe writes (pipes are synchronous)
    2. Pipe writes block until the reader consumes them sequentially
    3. Reader can't reach later pipes because connections are held by workers blocked on earlier pipes

    Solution: Download to buffer first, then write to pipe:

    var buf bytes.Buffer
    client.Body(ctx, id, &buf)  // Connection released here
    io.Copy(pipeWriter, &buf)   // May block, but connection is free
    
  • Health Checks: Provider health checks run every 60 seconds with minimal overhead (single DATE command per provider).

Examples

See working example programs in the repository:

  • examples/proxy_example.go: Complete examples of SOCKS proxy configuration (socks4, socks4a, socks5 with authentication)
  • cmd/nzb-downloader/: Full-featured NZB downloader with concurrent downloads, progress tracking, and metrics display

Testing

# Run all tests
go test -v ./...

# Run with race detector
go test -race ./...

# Run benchmarks
go test -bench=. -benchmem

License

MIT License


Full API Documentation: See pkg.go.dev for complete API reference and additional examples.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrProviderClosed = errors.New("provider closed")

ErrProviderClosed is returned when a request is made to a closed provider.

View Source
var ErrProviderUnavailable = errors.New("provider unavailable: all connections failed")

ErrProviderUnavailable is returned when all connections to a provider have failed. This error is retryable - the client should try other providers or retry later.

Functions

func IsArticleNotFound added in v3.1.1

func IsArticleNotFound(err error) bool

IsArticleNotFound checks if an error is an ArticleNotFoundError. Returns true if the error (or any error in the chain) indicates an article was not found.

Types

type ArticleNotFoundError added in v3.1.1

type ArticleNotFoundError struct {
	MessageID  string
	StatusCode int
	Status     string
}

ArticleNotFoundError indicates an article was not found (NNTP 430 status).

func (*ArticleNotFoundError) Error added in v3.1.1

func (e *ArticleNotFoundError) Error() string

type Auth

type Auth struct {
	Username string
	Password string
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient() *Client

func (*Client) AddProvider

func (c *Client) AddProvider(provider *Provider, tier ProviderType) error

func (*Client) Article

func (c *Client) Article(ctx context.Context, id string, w io.Writer) error

Article retrieves the header and body of an article by its message ID.

func (*Client) Body

func (c *Client) Body(ctx context.Context, id string, w io.Writer) error

Body retrieves the body of an article by its message ID.

func (*Client) BodyAt

func (c *Client) BodyAt(ctx context.Context, id string, w io.WriterAt) error

BodyAt retrieves the body of an article by its message ID, writing to an io.WriterAt.

func (*Client) BodyReader

func (c *Client) BodyReader(ctx context.Context, id string) (YencReader, error)

BodyReader retrieves the body of an article by its message ID as a stream. It returns a reader that provides access to yEnc headers as soon as they are parsed.

func (*Client) Close

func (c *Client) Close()

func (*Client) Date

func (c *Client) Date(ctx context.Context) error

func (*Client) Group

func (c *Client) Group(ctx context.Context, group string) (*Response, error)

Group selects a newsgroup. Note: In a connection pool, this selection only applies to the specific connection used for this request and is not guaranteed to persist for subsequent requests.

func (*Client) Head

func (c *Client) Head(ctx context.Context, id string) (*Response, error)

Head retrieves the headers of an article by its message ID.

func (*Client) Metrics

func (c *Client) Metrics() map[string]ProviderMetrics

Metrics returns metrics for all active providers.

func (*Client) Post added in v3.1.0

func (c *Client) Post(ctx context.Context, headers map[string]string, body io.Reader) (*Response, error)

Post posts an article to the NNTP server with the given headers and body. The headers map should contain standard NNTP headers like "From", "Newsgroups", "Subject", etc. The body reader provides the article content which will be transmitted with proper dot-stuffing.

NNTP response codes:

  • 240: Article posted successfully
  • 340: Send article to be posted (intermediate response)
  • 440: Posting not allowed
  • 441: Posting failed

func (*Client) PostYenc added in v3.1.0

func (c *Client) PostYenc(ctx context.Context, headers map[string]string, body io.Reader, opts *YencOptions) (*Response, error)

PostYenc posts an article with automatic yEnc encoding. The body reader provides the raw binary content which will be yEnc encoded before posting. The opts parameter specifies yEnc encoding options including filename, file size, and part information.

For single-part files, set opts.Part to 0 or 1 and opts.Total to 0 or 1. For multi-part files, set opts.Part and opts.Total appropriately, along with PartBegin and PartEnd.

NNTP response codes:

  • 240: Article posted successfully
  • 340: Send article to be posted (intermediate response)
  • 440: Posting not allowed
  • 441: Posting failed

func (*Client) RemoveProvider

func (c *Client) RemoveProvider(provider *Provider) error

func (*Client) Send

func (c *Client) Send(ctx context.Context, payload []byte, bodyWriter io.Writer) <-chan Response

func (*Client) SpeedTest

func (c *Client) SpeedTest(ctx context.Context, articleIDs []string) (SpeedTestStats, error)

SpeedTest performs a download speed test using the provided article IDs. It downloads articles concurrently using the pool's configured concurrency, discards the content, and returns performance statistics.

func (*Client) Stat

func (c *Client) Stat(ctx context.Context, id string) (*Response, error)

Stat checks if an article exists by its message ID.

type ConnFactory

type ConnFactory func(ctx context.Context) (net.Conn, error)

ConnFactory is used by Client to create connections.

type NNTPClient

type NNTPClient interface {
	// Provider management
	AddProvider(provider *Provider, tier ProviderType) error
	RemoveProvider(provider *Provider) error
	Close()

	// Article retrieval methods
	Body(ctx context.Context, id string, w io.Writer) error
	BodyReader(ctx context.Context, id string) (YencReader, error)
	BodyAt(ctx context.Context, id string, w io.WriterAt) error
	Article(ctx context.Context, id string, w io.Writer) error
	Head(ctx context.Context, id string) (*Response, error)
	Stat(ctx context.Context, id string) (*Response, error)
	Group(ctx context.Context, group string) (*Response, error)

	// Article posting methods
	Post(ctx context.Context, headers map[string]string, body io.Reader) (*Response, error)
	PostYenc(ctx context.Context, headers map[string]string, body io.Reader, opts *YencOptions) (*Response, error)

	// Advanced methods
	Send(ctx context.Context, payload []byte, bodyWriter io.Writer) <-chan Response
	Metrics() map[string]ProviderMetrics
	SpeedTest(ctx context.Context, articleIDs []string) (SpeedTestStats, error)
	Date(ctx context.Context) error
}

NNTPClient defines the public API for NNTP operations. The Client type implements this interface.

type NNTPConnection

type NNTPConnection struct {
	Greeting NNTPResponse
	// contains filtered or unexported fields
}

func NewNNTPConnection

func NewNNTPConnection(ctx context.Context, addr string, tlsConfig *tls.Config, inflightLimit int, reqCh <-chan *Request, auth Auth, maxIdleTime time.Duration, maxLifeTime time.Duration) (*NNTPConnection, error)

func (*NNTPConnection) Close

func (c *NNTPConnection) Close() error

func (*NNTPConnection) Done

func (c *NNTPConnection) Done() <-chan struct{}

func (*NNTPConnection) Ready added in v3.2.0

func (c *NNTPConnection) Ready() <-chan struct{}

func (*NNTPConnection) Run

func (c *NNTPConnection) Run()

type NNTPResponse

type NNTPResponse struct {
	BytesDecoded  int
	BytesConsumed int
	Lines         []string
	Format        rapidyenc.Format
	FileName      string
	FileSize      int64
	Part          int64
	PartBegin     int64
	PartSize      int64
	EndSize       int64
	Total         int64
	ExpectedCRC   uint32
	Message       string
	State         rapidyenc.State
	StatusCode    int
	CRC           uint32

	OnYencHeader func(*YencHeader)
	// contains filtered or unexported fields
}

func (*NNTPResponse) Feed

func (r *NNTPResponse) Feed(buf []byte, out io.Writer) (consumed int, done bool, err error)

Feed consumes raw NNTP protocol bytes from buf, writing any decoded payload bytes to out. It returns (bytesConsumedFromBuf, done, error).

type Provider

type Provider struct {
	Host string
	// contains filtered or unexported fields
}

func NewProvider

func NewProvider(ctx context.Context, config ProviderConfig) (*Provider, error)

func (*Provider) Close

func (c *Provider) Close() error

Close cancels the provider, closes its request channel, and waits for all connections to stop.

func (*Provider) Date

func (c *Provider) Date(ctx context.Context) error

Date checks the server time.

func (*Provider) Metrics

func (c *Provider) Metrics() ProviderMetrics

func (*Provider) Send

func (c *Provider) Send(ctx context.Context, payload []byte, bodyWriter io.Writer) <-chan Response

func (*Provider) SendRequest

func (c *Provider) SendRequest(req *Request) <-chan Response

func (*Provider) ThrottleConnections added in v3.2.0

func (c *Provider) ThrottleConnections()

ThrottleConnections temporarily reduces max connections to current active count. Called when the server returns a "max connections exceeded" error.

type ProviderConfig

type ProviderConfig struct {
	Address               string
	MaxConnections        int
	InitialConnections    int
	InflightPerConnection int
	MaxConnIdleTime       time.Duration
	MaxConnLifetime       time.Duration
	// HealthCheckPeriod is how often the centralized health monitor scans connections
	// for idle/lifetime expiration. Default: 1s. Set to 0 to disable health checks
	// (connections will then manage their own timeouts via timers).
	HealthCheckPeriod time.Duration
	Auth              Auth
	TLSConfig         *tls.Config
	ConnFactory       ConnFactory
	// ProxyURL configures SOCKS proxy connection (e.g., "socks5://host:port" or "socks5://user:pass@host:port").
	// Supports socks4, socks4a, and socks5 protocols.
	// Only used when ConnFactory is nil.
	ProxyURL string
}

type ProviderMetrics

type ProviderMetrics struct {
	Host              string
	ActiveConnections int
	TotalBytesRead    uint64
	TotalBytesWritten uint64
	ThroughputMB      float64 // MB/s
}

type ProviderType

type ProviderType int
const (
	ProviderPrimary ProviderType = iota
	ProviderBackup
)

type Request

type Request struct {
	Ctx context.Context

	Payload []byte
	RespCh  chan Response

	// Optional: decoded body bytes are streamed here. If nil, they are buffered into Response.Body.
	BodyWriter io.Writer

	// Optional: callback for when yEnc headers are parsed
	OnYencHeader func(*YencHeader)
}

type Response

type Response struct {
	StatusCode int
	Status     string

	// For non-body multiline responses (CAPABILITIES, etc).
	Lines []string

	// Decoded payload bytes (only if Request.BodyWriter == nil).
	Body bytes.Buffer

	// Decoder metadata/status gathered while parsing.
	Meta NNTPResponse

	Err     error
	Request *Request
}

type SpeedTestStats

type SpeedTestStats struct {
	TotalBytes     int64
	Duration       time.Duration
	BytesPerSecond float64
	SuccessCount   int32
	FailureCount   int32
}

SpeedTestStats contains metrics from a speed test run.

type YencHeader

type YencHeader struct {
	FileName  string
	FileSize  int64
	Part      int64
	PartBegin int64
	PartSize  int64
	Total     int64
}

type YencOptions added in v3.1.0

type YencOptions struct {
	FileName  string // Required: Name of the file being encoded
	FileSize  int64  // Required: Total size of the original file
	Part      int64  // For multi-part files (1-based), 0 or 1 means single-part
	Total     int64  // For multi-part files, total number of parts
	PartBegin int64  // For multi-part files, beginning byte offset (1-based)
	PartEnd   int64  // For multi-part files, ending byte offset (1-based, inclusive)
}

YencOptions contains options for yEnc encoding when posting articles.

type YencReader

type YencReader interface {
	io.ReadCloser
	YencHeaders() *YencHeader
}

Directories

Path Synopsis
cmd
nzb-downloader command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL