connpool

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2024 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 2 more Imports: 15 Imported by: 0

README

English | 中文

Background

When client requests server, if the communication is conducted using TCP protocol, the cost of establishing a connection with the three-way handshake needs to be considered. In general, TCP communication mode will pre-establish a connection, or establish a connection when the request is initiated. After using the connection, it will not be directly closed, but will be reused later.

Connection pool is a certain degree of encapsulation to achieve this function.

Principle

The pool maintains a sync.Map as a connection pool, the key is encoded by <network, address, protocol>, and the value is the ConnectionPool formed by the connection established with the target address, and a linked list is used to maintain idle connections inside. In short connection mode, the transport layer closes the connection after the RPC call, while in the connection pool mode, the used connection is returned to the connection pool to be taken out when needed next time.

To achieve the above purposes, the connection pool needs to have the following functions:

  • Provide available connections, including creating new connections and reusing idle connections.
  • Recycling the connections used by the upper layer as idle connection management.
  • Manage the idle connections in the connection pool, including the selection policy for reusing connections, and health checks on idle connections.
  • Adjust the running parameters of the connection pool according to user configuration.

Design Implementation

Its overall code structure is shown in the figure below

design implementation

Initialize the Connection Pool

NewConnectionPool creates a connection pool and supports passing in option to modify parameters. If no options are passed, the default values will be used for initialization. Dial is the default method for creating connections, and each ConnectionPool generates DialOptions based on its GetOptions to establish a connection to the corresponding target.

func NewConnectionPool(opt ...Option) Pool {
  opts := &Options{
    MaxIdle: defaultMaxIdle,
    IdleTimeout: defaultIdleTimeout,
    DialTimeout: defaultDialTimeout,
    Dial: Dial,
  }
  for _, o := range opt {
    o(opts)
  }
  return &pool{
    opts: opts,
    connectionPools: new(sync.Map),
  }
}

Get a Connection

A connection can be obtained through pool.Get, refer to the implementation of client_transport_tcp.go:

// Get
getOpts := connpool.NewGetOptions()
getOpts.WithContext(ctx)
getOpts.WithFramerBuilder(opts.FramerBuilder)
getOpts.WithDialTLS(opts.TLSCertFile, opts.TLSKeyFile, opts.CACertFile, opts.TLSServerName)
getOpts.WithLocalAddr(opts.LocalAddr)
getOpts.WithDialTimeout(opts.DialTimeout)
getOpts.WithProtocol(opts.Protocol)
conn, err = opts.Pool.Get(opts.Network, opts.Address, getOpts)

ConnPool only exposes the Get interfaces to the outside world, ensuring that the connection pool status is not damaged by user error.

Get gets the ConnectionPool according to <network, address, protocol>. If the acquisition fails, you need to create it first. Concurrency control is done here to prevent the ConnectionPool from being created repeatedly. The core code is as follows:

func (p *pool) Get(network string, address string, opts GetOptions) (net.Conn, error) {
  // ...
  key := getNodeKey(network, address, opts.Protocol)
  if v, ok := p.connectionPools.Load(key); ok {
    return v.(*ConnectionPool).Get(ctx)
  }
  // create newPool...
  v, ok := p.connectionPools.LoadOrStore(key, newPool)
  if !ok {
    // init newPool...
    return newPool.Get(ctx)
  }
  return v.(*ConnectionPool).Get(ctx)
}

After obtaining the ConnectionPool, an attempt is made to obtain a connection. First, a token needs to be obtained. The token is a ch used for concurrency control, and its buffer length is based on MaxActive. It represents the number of connections that the user can use at the same time. When an active connection is returned to the connection pool or closed, the token is returned. If Wait=True is set, it will wait until a timeout is returned when the token cannot be obtained. If Wait=False is set, ErrPoolLimit will be returned directly when the token cannot be obtained.

func (p *ConnectionPool) getToken(ctx context.Context) error {
  if p.MaxActive <= 0 {
    return nil
  }
    
  if p.Wait {
    select {
    case p.token <- struct{}{}:
      return nil
    case <-ctx.Done():
      return ctx.Err()
    }
  } else {
    select {
    case p.token <- struct{}{}:
      return nil
    default:
      return ErrPoolLimit
    }
  }
}

func (p *ConnectionPool) freeToken() {
  if p.MaxActive <= 0 {
    return
  }
  <-p.token
}

After the token is successfully obtained, the idle connection is first obtained from the idle list, and if it fails, the newly created connection returns.

Initialize the ConnectionPool

Initialization of the ConnectionPool should be performed when using Get, which is mainly divided into starting the check coroutine and preheating idle connections based on MinIdle.

KeepMinIdles

A sudden surge in business traffic may result in the creation of a large number of new connections. Creating a connection is a time-consuming operation and may result in request timeouts. Pre-creating some idle connections can have a preheating effect. When the connection pool is created, MinIdle connections are created for backup.

Check Goroutine

The ConnectionPool periodically performs the following checks:

  • Idle connection health check

    The default health check strategy is shown in the figure below. The health check scans the idle list. If the connection fails the security check, it will be closed directly. First, the connection is checked for normality, and then checked for reaching IdleTimeout and MaxConnLifetime. With WithHealthChecker, the health check policy can be customized.

    In addition to periodically checking idle connections, a check is performed each time an idle connection is retrieved from the idle list. At this time, isFast is set to true and only a connection survival confirmation is performed.

    func (p *ConnectionPool) defaultChecker(pc *PoolConn, isFast bool) bool {
      if pc.isRemoteError(isFast) {
        return false
      }
      if isFast {
        return true
      }
      if p.IdleTimeout > 0 && pc.t.Add(p.IdleTimeout).Before(time.Now()) {
        return false
      }
      if p.MaxConnLifetime > 0 && pc.created.Add(p.MaxConnLifetime).Before(time.Now()) {
        return false
      }
      return true
    }
    

    The detection time for idle connections in the connection pool should generally be made configurable to coordinate with the server side (especially considering different framework scenarios), as improper coordination can lead to issues. For example, if the detection time for idle connections in the pool is 1 minute, and the server also has a 1-minute detection time, the following scenario may occur: when the server closes idle connections intensively, the client side has not yet detected it. As a result, a large number of failures may occur when sending data, which can only be resolved by retrials on the upper layer. A better solution is to set the idle connection detection duration for the server to be longer than that for the connection pool, as much as possible to allow the client side to actively close the connection, thereby avoiding the situation where the retrieved connection is closed by the server without the client's knowledge.

    There is actually an optimization idea here, which is to perform a non-blocking read through a system call each time a connection is retrieved, which can actually determine whether the connection has been closed by the peer. This is available on Unix/Linux platforms, but encountered some problems on Windows platforms, so this optimization point was removed in tRPC-Go.

  • Idle connection count check:

    Same as KeepMinIdles, periodically replenishes the idle connections to MinIdle.

  • ConnectionPool idle check:

    The transport will not actively close the ConnectionPool, which will cause the background check coroutine to run idly. By setting poolIdleTimeout, periodic checks are performed within this time period to ensure that ConnectionPool that have not been used for a long time are automatically closed when the number of connections used by the users is 0.

Connection life cycle

MinIdle is the minimum number of idle connections maintained by the ConnectionPool, which is replenished during initialization and periodic checks. When a user requests a connection, it first tries to retrieve one from the idle connections; if there are no idle connections available, a new one is created. After the user finishes the request, the connection is returned to the ConnectionPool. There are three possible scenarios:

  • If the number of idle connections exceeds MaxIdle, one idle connection will be closed based on the elimination strategy.
  • If forceClose for the connection pool is set to true, the connection will be closed directly instead of being returned to the ConnectionPool.
  • The connection will be added to the idle connection list.

If there is a read/write error during connection usage by the user, the connection will be closed directly. If the check for connection availability fails, the connection will also be closed directly.

connection life cycle

Idle Connection Management Policy

The connection pool has two strategies for selecting and eliminating idle connections: FIFO and LIFO. This is controlled by PushIdleConnToTail, and the appropriate management strategy should be chosen based on the actual characteristics of the business.

  • FIFO ensures that each connection is evenly used, but when the caller's request frequency is not high, but happens to request before the connection idle condition is met each time, it will cause all connections to not be released, and maintaining so many connections is unnecessary.
  • LIFO prioritizes the top connection of the stack, and idle connections at the bottom that are not frequently used will be eliminated first.
func (p *ConnectionPool) addIdleConn(ctx context.Context) error {
  c, _ := p.dial(ctx)
  pc := p.newPoolConn(c)
  if !p.PushIdleConnToTail {
    p.idle.pushHead(pc)
  } else {
    p.idle.pushTail(pc)
  }
}

func (p *ConnectionPool) getIdleConn() *PoolConn {
  for p.idle.head != nil {
    pc := p.idle.head
    p.idle.popHead()
    // ...
  }
}

func (p *ConnectionPool) put(pc *PoolConn, forceClose bool) error {
  if !p.closed && !forceClose {
    if !p.PushIdleConnToTail {
      p.idle.pushHead(pc)
    } else {
      p.idle.pushTail(pc)
    }
    if p.idleSize >= p.MaxIdle {
      pc = p.idle.tail
      p.idle.popTail()
    }
  }
}

Documentation

Overview

Package connpool provides the connection pool.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrPoolLimit  = errors.New("connection pool limit")  // ErrPoolLimit number of connections exceeds the limit error.
	ErrPoolClosed = errors.New("connection pool closed") // ErrPoolClosed connection pool closed error.
	ErrConnClosed = errors.New("conn closed")            // ErrConnClosed connection closed.
	ErrNoDeadline = errors.New("dial no deadline")       // ErrNoDeadline has no deadline set.
	ErrConnInPool = errors.New("conn already in pool")   // ErrNoDeadline has no deadline set.
)

connection pool error message.

View Source
var DefaultConnectionPool = NewConnectionPool()

DefaultConnectionPool is the default connection pool, replaceable.

Functions

func Dial

func Dial(opts *DialOptions) (net.Conn, error)

Dial initiates the request.

Types

type ConnectionPool

type ConnectionPool struct {
	Dial        func(context.Context) (net.Conn, error) // initialize the connection.
	MinIdle     int                                     // Minimum number of idle connections.
	MaxIdle     int                                     // Maximum number of idle connections, 0 means no limit.
	MaxActive   int                                     // Maximum number of active connections, 0 means no limit.
	IdleTimeout time.Duration                           // idle connection timeout.
	// Whether to wait when the maximum number of active connections is reached.
	Wait            bool
	MaxConnLifetime time.Duration // Maximum lifetime of the connection.

	PushIdleConnToTail bool // connection to ip will be push tail when ConnectionPool.put method is called.
	// contains filtered or unexported fields
}

ConnectionPool is the connection pool.

func (*ConnectionPool) Close

func (p *ConnectionPool) Close() error

Close releases the connection.

func (*ConnectionPool) Get

func (p *ConnectionPool) Get(ctx context.Context) (*PoolConn, error)

Get gets the connection from the connection pool.

func (*ConnectionPool) RegisterChecker

func (p *ConnectionPool) RegisterChecker(interval time.Duration, checker HealthChecker)

RegisterChecker registers the idle connection check method.

type DialFunc

type DialFunc func(opts *DialOptions) (net.Conn, error)

DialFunc connects to an endpoint with the information in options.

type DialOptions

type DialOptions struct {
	Network       string
	Address       string
	LocalAddr     string
	Timeout       time.Duration
	CACertFile    string // ca certificate.
	TLSCertFile   string // client certificate.
	TLSKeyFile    string // client secret key.
	TLSServerName string // The client verifies the server's service name,
	// if not filled in, it defaults to the http hostname.
	IdleTimeout time.Duration
}

DialOptions request parameters.

type GetOptions

type GetOptions struct {
	FramerBuilder codec.FramerBuilder
	CustomReader  func(io.Reader) io.Reader
	Ctx           context.Context

	CACertFile    string // ca certificate.
	TLSCertFile   string // client certificate.
	TLSKeyFile    string // client secret key.
	TLSServerName string // The client verifies the server's service name,

	LocalAddr   string        // The local address when establishing a connection, which is randomly selected by default.
	DialTimeout time.Duration // Connection establishment timeout.
	Protocol    string        // protocol type.
}

GetOptions is the get conn configuration.

func NewGetOptions

func NewGetOptions() GetOptions

NewGetOptions creates and initializes GetOptions.

func (*GetOptions) WithContext

func (o *GetOptions) WithContext(ctx context.Context)

WithContext returns an Option which sets the requested ctx.

func (*GetOptions) WithCustomReader

func (o *GetOptions) WithCustomReader(customReader func(io.Reader) io.Reader)

WithCustomReader returns an option which sets a customReader. Connection pool will uses this customReader to create a reader encapsulating the underlying connection, which is usually used to create a buffer.

func (*GetOptions) WithDialTLS

func (o *GetOptions) WithDialTLS(certFile, keyFile, caFile, serverName string)

WithDialTLS returns an Option which sets the client to support TLS.

func (*GetOptions) WithDialTimeout

func (o *GetOptions) WithDialTimeout(dur time.Duration)

WithDialTimeout returns an Option which sets the connection timeout.

func (*GetOptions) WithFramerBuilder

func (o *GetOptions) WithFramerBuilder(fb codec.FramerBuilder)

WithFramerBuilder returns an Option which sets the FramerBuilder.

func (*GetOptions) WithLocalAddr

func (o *GetOptions) WithLocalAddr(addr string)

WithLocalAddr returns an Option which sets the local address when establishing a connection, and it is randomly selected by default when there are multiple network cards.

func (*GetOptions) WithProtocol

func (o *GetOptions) WithProtocol(s string)

WithProtocol returns an Option which sets the backend service protocol name.

type HealthChecker

type HealthChecker func(pc *PoolConn, isFast bool) bool

HealthChecker idle connection health check function. The function supports quick check and comprehensive check. Quick check is called when an idle connection is obtained, and only checks whether the connection status is abnormal. The function returns true to indicate that the connection is available normally.

type Option

type Option func(*Options)

Option is the Options helper.

func WithDialFunc

func WithDialFunc(d DialFunc) Option

WithDialFunc returns an Option which sets dial function.

func WithDialTimeout

func WithDialTimeout(t time.Duration) Option

WithDialTimeout returns an Option which sets the default timeout time for the connection pool to establish a connection.

func WithForceClose

func WithForceClose(f bool) Option

WithForceClose returns an Option which sets whether to force the connection to be closed.

func WithHealthChecker

func WithHealthChecker(c HealthChecker) Option

WithHealthChecker returns an Option which sets health checker.

func WithIdleTimeout

func WithIdleTimeout(t time.Duration) Option

WithIdleTimeout returns an Option which sets the idle connection time, after which it may be closed.

func WithMaxActive

func WithMaxActive(s int) Option

WithMaxActive returns an Option which sets the maximum number of active connections.

func WithMaxConnLifetime

func WithMaxConnLifetime(t time.Duration) Option

WithMaxConnLifetime returns an Option which sets the maximum lifetime of the connection, after which it may be closed.

func WithMaxIdle

func WithMaxIdle(m int) Option

WithMaxIdle returns an Option which sets the maximum number of idle connections.

func WithMinIdle

func WithMinIdle(n int) Option

WithMinIdle returns an Option which sets the number of initialized connections.

func WithPoolIdleTimeout

func WithPoolIdleTimeout(t time.Duration) Option

WithPoolIdleTimeout returns an Option which sets pool idle timeout. after the timeout, ConnectionPool resource may be cleaned up.

func WithPushIdleConnToTail

func WithPushIdleConnToTail(c bool) Option

WithPushIdleConnToTail returns an Option which sets PushIdleConnToTail flag.

func WithWait

func WithWait(w bool) Option

WithWait returns an Option which sets whether to wait when the number of connections reaches the limit.

type Options

type Options struct {
	MinIdle   int // Initialize the number of connections, ready for the next io.
	MaxIdle   int // Maximum number of idle connections, 0 means no idle.
	MaxActive int // Maximum number of active connections, 0 means no limit.
	// Whether to wait when the maximum number of active connections is reached.
	Wait               bool
	IdleTimeout        time.Duration // idle connection timeout.
	MaxConnLifetime    time.Duration // Maximum lifetime of the connection.
	DialTimeout        time.Duration // Connection establishment timeout.
	ForceClose         bool
	Dial               DialFunc
	Checker            HealthChecker
	PushIdleConnToTail bool          // connection to ip will be push tail when ConnectionPool.put method is called
	PoolIdleTimeout    time.Duration // ConnectionPool idle timeout
}

Options indicates pool configuration.

type Pool

type Pool interface {
	Get(network string, address string, opt GetOptions) (net.Conn, error)
}

Pool is the interface that specifies client connection pool options. Compared with Pool, Pool directly uses the GetOptions data structure for function input parameters. Compared with function option input parameter mode, it can reduce memory escape and improve calling performance.

func NewConnectionPool

func NewConnectionPool(opt ...Option) Pool

NewConnectionPool creates a connection pool.

type PoolConn

type PoolConn struct {
	net.Conn
	// contains filtered or unexported fields
}

PoolConn is the connection in the connection pool.

func (*PoolConn) Close

func (pc *PoolConn) Close() error

Close overrides the Close method of net.Conn and puts it back into the connection pool.

func (*PoolConn) GetRawConn

func (pc *PoolConn) GetRawConn() net.Conn

GetRawConn gets raw connection in PoolConn.

func (*PoolConn) Read

func (pc *PoolConn) Read(b []byte) (int, error)

Read reads data on the connection.

func (*PoolConn) ReadFrame

func (pc *PoolConn) ReadFrame() ([]byte, error)

ReadFrame reads the frame.

func (*PoolConn) Write

func (pc *PoolConn) Write(b []byte) (int, error)

Write sends data on the connection.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL