socket

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

README

socket

A simple, high-performance TCP server framework for Go.

Go Reference CI

Features

  • Simple API - Easy to use with functional options pattern
  • Custom Codec - Pluggable message encoding/decoding via io.Reader
  • Graceful Shutdown - Context-based cancellation support
  • Idle Timeout - Automatic read/write deadline management for connection health
  • Error Handling - Flexible error handling with Disconnect or Continue actions
  • Structured Logging - Built-in slog integration

Requirements

  • Go 1.23+

Installation

go get github.com/Zereker/socket

Quick Start

package main

import (
    "context"
    "io"
    "log"
    "net"

    "github.com/Zereker/socket"
)

// Define your message type
type Message struct {
    Data []byte
}

func (m Message) Length() int  { return len(m.Data) }
func (m Message) Body() []byte { return m.Data }

// Implement the Codec interface
type SimpleCodec struct{}

func (c *SimpleCodec) Decode(r io.Reader) (socket.Message, error) {
    buf := make([]byte, 1024)
    n, err := r.Read(buf)
    if err != nil {
        return nil, err
    }
    return Message{Data: buf[:n]}, nil
}

func (c *SimpleCodec) Encode(msg socket.Message) ([]byte, error) {
    return msg.Body(), nil
}

// Implement the Handler interface
type EchoHandler struct{}

func (h *EchoHandler) Handle(tcpConn *net.TCPConn) {
    conn, err := socket.NewConn(tcpConn,
        socket.CustomCodecOption(&SimpleCodec{}),
        socket.OnMessageOption(func(msg socket.Message) error {
            // Echo the message back
            return conn.Write(msg)
        }),
    )
    if err != nil {
        log.Printf("failed to create connection: %v", err)
        return
    }
    conn.Run(context.Background())
}

func main() {
    addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:8080")
    server, err := socket.New(addr)
    if err != nil {
        log.Fatal(err)
    }

    log.Println("server listening on", addr)
    server.Serve(context.Background(), &EchoHandler{})
}

Configuration Options

Option Description Default
CustomCodecOption(codec) Set message codec (required) -
OnMessageOption(handler) Set message handler (required) -
OnErrorOption(handler) Set error handler Disconnect on error
IdleTimeoutOption(duration) Set idle timeout for read/write deadlines 30s
BufferSizeOption(size) Set send channel buffer size 1
MessageMaxSize(size) Set max message size 1MB
LoggerOption(logger) Set custom logger slog default

Note: The idle timeout sets TCP read/write deadlines but does not send ping/pong packets. For active connection health checking, implement heartbeat messages in your application protocol.

Error Handling

Control how errors are handled with OnErrorOption:

socket.OnErrorOption(func(err error) socket.ErrorAction {
    if isTemporaryError(err) {
        return socket.Continue  // Suppress error and continue
    }
    return socket.Disconnect    // Close the connection
})

Connection Management

// Gracefully close the connection
conn.Close()

// Check if connection is closed
if conn.IsClosed() {
    // Handle closed connection
}

// Get remote address
addr := conn.Addr()

Write Methods

Three ways to send messages with different blocking behaviors:

// Non-blocking write (fire-and-forget)
// Returns ErrBufferFull immediately if channel is full
// Best for: non-critical data, custom backpressure handling
err := conn.Write(msg)
if errors.Is(err, socket.ErrBufferFull) {
    // Handle backpressure: drop, retry, or use blocking write
}

// Blocking write with context cancellation
// Waits for buffer space, respects context timeout/cancellation
// Best for: critical messages that must be delivered
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn.WriteBlocking(ctx, msg)

// Write with timeout
// Waits up to the specified duration for buffer space
// Best for: simple timeout without context management
conn.WriteTimeout(msg, 5*time.Second)

All write methods return ErrConnectionClosed if the connection is closed.

Backpressure Handling

When ErrBufferFull is returned, it indicates the receiver is not consuming messages fast enough. Strategies:

  • Drop: Acceptable for metrics, heartbeats, or non-critical updates
  • Retry with backoff: For important but delay-tolerant messages
  • Switch to blocking: Use WriteBlocking when delivery is critical
  • Flow control: Implement application-level rate limiting

Custom Logger

Implement the Logger interface or use slog:

type Logger interface {
    Debug(msg string, args ...any)
    Info(msg string, args ...any)
    Warn(msg string, args ...any)
    Error(msg string, args ...any)
}

License

MIT License - see LICENSE for details.

Documentation

Overview

Package socket provides a simple TCP server framework for Go. It supports custom message encoding/decoding, asynchronous I/O operations, and connection management with idle timeout monitoring.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidCodec is returned when no codec is provided.
	ErrInvalidCodec = errors.New("invalid codec callback")
	// ErrInvalidOnMessage is returned when no message handler is provided.
	ErrInvalidOnMessage = errors.New("invalid on message callback")
	// ErrMessageTooLarge is returned when a message exceeds the maximum allowed size.
	ErrMessageTooLarge = errors.New("message too large")
)

Errors returned by connection operations.

View Source
var ErrBufferFull = errors.New("send buffer full")

ErrBufferFull is returned when the send buffer is full and cannot accept more messages. This error indicates backpressure - the receiver is not consuming messages fast enough. Recommended handling strategies:

  • Drop the message (for non-critical data like metrics)
  • Use WriteBlocking or WriteTimeout to wait for buffer space
  • Implement application-level flow control
View Source
var ErrConnectionClosed = errors.New("connection closed")

ErrConnectionClosed is returned when operating on a closed connection.

Functions

This section is empty.

Types

type Codec

type Codec interface {
	// Decode reads and decodes a complete message from the reader.
	// The implementation should read exactly the bytes needed for one message.
	// This handles TCP packet fragmentation by allowing the codec to control
	// how many bytes are read.
	Decode(r io.Reader) (Message, error)
	// Encode encodes a Message into raw bytes for transmission.
	Encode(Message) ([]byte, error)
}

Codec is the interface for message encoding and decoding. Applications should implement this interface to define their own message serialization format (e.g., JSON, Protocol Buffers, etc.).

The Decode method reads from an io.Reader, which allows the codec to handle TCP stream reassembly by reading exactly the number of bytes needed for a complete message. This solves the TCP packet fragmentation problem.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents a client connection to a TCP server. It manages the underlying TCP connection, message encoding/decoding, and provides read/write loops for asynchronous communication.

func NewConn

func NewConn(conn *net.TCPConn, opt ...Option) (*Conn, error)

NewConn creates a new connection wrapper around the given TCP connection. It applies the provided options and validates them before returning. Returns an error if required options (codec, onMessage) are missing.

func (*Conn) Addr

func (c *Conn) Addr() net.Addr

Addr returns the remote address of the connection.

func (*Conn) Close added in v1.0.1

func (c *Conn) Close() error

Close gracefully closes the connection. It cancels the context and closes the underlying TCP connection. Safe to call multiple times.

func (*Conn) IsClosed added in v1.0.1

func (c *Conn) IsClosed() bool

IsClosed returns true if the connection has been closed.

func (*Conn) Run

func (c *Conn) Run(ctx context.Context) error

Run starts the connection's read and write loops. It creates two goroutines for concurrent reading and writing, and blocks until an error occurs or the context is canceled. The connection is automatically closed when Run returns.

func (*Conn) Write

func (c *Conn) Write(message Message) error

Write sends a message through the connection without blocking (fire-and-forget). The message is encoded using the configured codec and queued for sending.

Returns:

  • nil: message was successfully queued (not yet sent)
  • ErrBufferFull: send buffer is full, message was NOT queued
  • ErrConnectionClosed: connection is closed
  • encoding error: if codec.Encode fails

Use this method when:

  • You can tolerate message loss under backpressure
  • You have your own retry/backpressure logic
  • Low latency is critical and blocking is unacceptable

For guaranteed delivery, use WriteBlocking or WriteTimeout instead.

func (*Conn) WriteBlocking

func (c *Conn) WriteBlocking(ctx context.Context, message Message) error

WriteBlocking sends a message through the connection, blocking until the message is queued or the context is canceled. This is the safest write method for guaranteed delivery.

Returns:

  • nil: message was successfully queued
  • context.Canceled or context.DeadlineExceeded: context was canceled
  • ErrConnectionClosed: connection is closed
  • encoding error: if codec.Encode fails

Use this method when:

  • Message delivery is critical
  • You have proper timeout handling via context
  • Blocking is acceptable for your use case

func (*Conn) WriteTimeout

func (c *Conn) WriteTimeout(message Message, timeout time.Duration) error

WriteTimeout sends a message through the connection with a timeout. This provides a middle ground between Write (non-blocking) and WriteBlocking.

Returns:

  • nil: message was successfully queued
  • ErrBufferFull: timeout expired before message could be queued
  • ErrConnectionClosed: connection is closed
  • encoding error: if codec.Encode fails

Use this method when:

  • You want to wait for buffer space but with a time limit
  • You don't have an existing context to pass

type ErrorAction

type ErrorAction int

ErrorAction defines the action to take when an error occurs.

const (
	// Disconnect closes the connection when an error occurs.
	Disconnect ErrorAction = iota
	// Continue suppresses the error and continues processing.
	Continue
)

type Handler

type Handler interface {
	// Handle is called for each new connection.
	// The implementation is responsible for managing the connection.
	Handle(conn *net.TCPConn)
}

Handler is the interface for handling incoming TCP connections. Implementations should handle the connection lifecycle and message processing.

type Logger

type Logger interface {
	// Debug logs a debug-level message with optional key-value pairs.
	Debug(msg string, args ...any)
	// Info logs an info-level message with optional key-value pairs.
	Info(msg string, args ...any)
	// Warn logs a warning-level message with optional key-value pairs.
	Warn(msg string, args ...any)
	// Error logs an error-level message with optional key-value pairs.
	Error(msg string, args ...any)
}

Logger is the interface for structured logging. It is designed to be compatible with *slog.Logger from the standard library. Applications can provide their own implementation or use the default slog logger.

type Message

type Message interface {
	// Length returns the length of the message body.
	Length() int
	// Body returns the raw message data.
	Body() []byte
}

Message is the interface for messages transmitted over the connection. Implementations should provide the message length and body.

type Option

type Option func(*options)

Option is a function that configures connection options.

func BufferSizeOption

func BufferSizeOption(size int) Option

BufferSizeOption returns an Option that sets the size of the send channel buffer. A larger buffer allows more messages to be queued before blocking.

func CustomCodecOption

func CustomCodecOption(codec Codec) Option

CustomCodecOption returns an Option that sets the message codec. The codec is required and must be provided before creating a connection.

func IdleTimeoutOption added in v1.1.0

func IdleTimeoutOption(timeout time.Duration) Option

IdleTimeoutOption returns an Option that sets the idle timeout for connections. If no data is received within the timeout period, the connection will be closed. The actual read/write deadline is set to idleTimeout * 2 to allow for some network latency. Default is 30 seconds.

Note: This is NOT a heartbeat mechanism that sends ping/pong packets. It only sets TCP read/write deadlines. If you need active connection health checking, implement ping/pong messages in your application protocol.

func LoggerOption

func LoggerOption(logger Logger) Option

LoggerOption returns an Option that sets the logger. If not set, the default slog logger will be used.

func MessageMaxSize

func MessageMaxSize(size int) Option

MessageMaxSize returns an Option that sets the maximum message buffer size. Messages larger than this size cannot be received.

func OnErrorOption

func OnErrorOption(cb func(error) ErrorAction) Option

OnErrorOption returns an Option that sets the error callback. The callback is invoked when a read/write error occurs. Return Disconnect to close the connection, or Continue to suppress the error.

func OnMessageOption

func OnMessageOption(cb func(Message) error) Option

OnMessageOption returns an Option that sets the message handler callback. This callback is required and is invoked for each received message.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents a TCP server that listens for incoming connections.

func New

func New(addr *net.TCPAddr, opts ...ServerOption) (*Server, error)

New creates a new TCP server bound to the specified address. Returns an error if the address cannot be bound.

func (*Server) Addr

func (s *Server) Addr() net.Addr

Addr returns the listener's network address.

func (*Server) Close

func (s *Server) Close() error

Close stops the server by closing the underlying listener. If a shutdown timeout is configured, Close() bypasses the remaining timeout. Any blocked Accept calls will return with an error.

func (*Server) Serve

func (s *Server) Serve(ctx context.Context, handler Handler) error

Serve starts accepting connections and dispatching them to the handler. It blocks until the context is canceled or an unrecoverable error occurs. When the context is canceled, it stops accepting new connections gracefully. If ServerShutdownTimeoutOption is set, the server waits up to the specified duration before stopping, allowing existing handlers to complete. Call Close() to bypass the timeout and shut down immediately.

type ServerOption added in v1.1.0

type ServerOption func(*Server)

ServerOption configures a Server.

func ServerLoggerOption added in v1.1.0

func ServerLoggerOption(logger Logger) ServerOption

ServerLoggerOption sets the logger for the server.

func ServerShutdownTimeoutOption added in v1.1.0

func ServerShutdownTimeoutOption(timeout time.Duration) ServerOption

ServerShutdownTimeoutOption sets the graceful shutdown timeout. When the context is canceled, the server will wait up to this duration before closing the listener. This gives existing connections time to complete. Default is 0 (immediate shutdown).

Note: This only delays listener closure. For full graceful shutdown with connection draining, track connections at the application level and cancel them with the context passed to Conn.Run().

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL