pool

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2021 License: MIT Imports: 13 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// DefaultConnBatch : maximum number of connection instances stored for a single batch
	DefaultConnBatch uint64 = 20

	// DefaultMaxPoolSize : the total size of the connection pool to store the gRPC connection instances
	DefaultMaxPoolSize uint64 = 60

	// DefaultScheme : gRPC connection scheme to override the default scheme "passthrough" to "dns"
	DefaultScheme string = "dns"

	// DefaultGrpcInsecure : the authentication [enable/disable] bool flag
	DefaultGrpcInsecure bool = true

	// DefaultInterceptor : the gRPC connection library currently only supports one type of interceptors to send msg to the server that doesn't expect a response
	DefaultInterceptor ConnectionInterceptor = UnaryClient

	// DefaultRetriableCodes : possible retriable gRPC connection failure codes
	DefaultRetriableCodes = []codes.Code{codes.Aborted, codes.Unknown, codes.ResourceExhausted, codes.Unavailable}

	ConnIndex         uint64 = 0
	ConnPoolPipeline  uint64 = 0
	ConnRecreateCount uint64 = 0
	IsConnRecreate    bool   = false
)

Functions

This section is empty.

Types

type ConnPool

type ConnPool struct {
	Conn              *grpc.ClientConn
	MaxPoolSize       uint64
	ConnInstanceBatch *batch.Batch
	ConnBatchQueue    *Queue
	Options           *PoolConnOptions
	PipelineDoneChan  chan interface{}
	Log               grpclog.LoggerV2
}

ConnPool struct defines several fields to construct a connection pool. The gRPC connection instance replicated using batch processing and queried using reflect.SelectCase that gets the connection object from the list of cases as a pseudo-random choice.

Conn: The base gRPC connection that keeps the initial client connection instance.

MaxPoolSize: The maximum number of connections created concurrently in the connection pool.

ConnInstanceBatch: The batch processing will help creates multiple replicas of base gRPC connection instances concurrently.
https://github.com/Deeptiman/go-batch library used to perform the batch processing.

Options: The options will keep the connection pool configurations to create gRPC dialoptions for base connection instances.

PipelineDoneChan: The connection pool runs a concurrency pipeline, so the "PipelineDoneChan" channel will be called after
all the stages of the pipeline finishes.

Log: The gRPC log will show the internal connection lifecycle that will be useful to debug the connection flow.

func NewConnPool

func NewConnPool(opts ...PoolOptions) *ConnPool

NewConnPool will create the connection pool object that will instantiate the configurations for connection batch, retryOptions, interceptor.

func (*ConnPool) ClientConn

func (c *ConnPool) ClientConn() (*grpc.ClientConn, error)

ClientConn will create the initial gRPC client connection instance. The connection factory works as a higher order function for gRPC retry policy in case of connection failure retries.

func (*ConnPool) ConnectionPoolPipeline

func (c *ConnPool) ConnectionPoolPipeline(conn *grpc.ClientConn, pipelineDoneChan chan interface{})

ConnectionPoolPipeline follows the concurrency pipeline technique to create a connection pool in a higher concurrent scenarios. The pipeline has several stages that use the Fan-In, Fan-Out technique to process the data pipeline using channels.

The entire process of creating the connection pool becomes a powerful function using the pipeline technique. There are four different stages in this pipeline that works as a generator pattern to create a connection pool.

func (*ConnPool) EnqueConnBatch

func (c *ConnPool) EnqueConnBatch(connItems batch.BatchItems)

EnqueConnBatch will enqueue the batchItems received from the go-batch supply channel.

func (*ConnPool) GetConnBatch

func (c *ConnPool) GetConnBatch() batch.BatchItems

GetConnBatch will retrieve the batch item from the connection queue that dequeues the items using the pseudo-random technique.

func (*ConnPool) GetConnPoolSize

func (c *ConnPool) GetConnPoolSize() uint64

GetConnPoolSize loads the number of connections created by the connection pool

type ConnectionInterceptor

type ConnectionInterceptor int

ConnectionInterceptor defines the interceptors[UnaryServer:UnaryClient] type for a gRPC connection.

const (
	// ConnectionInterceptor constants
	UnaryServer ConnectionInterceptor = iota
	UnaryClient
)

type PoolConnOptions

type PoolConnOptions struct {
	// contains filtered or unexported fields
}

type PoolOptions

type PoolOptions func(*ConnPool)

func WithAddress

func WithAddress(address string) PoolOptions

func WithAuthority

func WithAuthority(authority string) PoolOptions

func WithCodes

func WithCodes(codes []codes.Code) PoolOptions

func WithConnectionInterceptor

func WithConnectionInterceptor(interceptor ConnectionInterceptor) PoolOptions

func WithCredentials

func WithCredentials(credentials credentials.TransportCredentials) PoolOptions

func WithInsecure

func WithInsecure(insecure bool) PoolOptions

func WithRetry

func WithRetry(retry int) PoolOptions

func WithScheme

func WithScheme(scheme string) PoolOptions

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue struct defines the constructors to enqueue/dequeue of batch processing items.

size: The total size of the connection queue is similar to the value of MaxPoolSize.

itemSelect: The list of enqueued cases that gets stored as channels and dequeued using pseudo-random selection.

enqueCh: The enqueCh will store the array of channel batchItems and get added to the reflect.SelectCase.

sem: The semaphore will establish the synchronization between the enqueue/dequeue process by creating Acquire/Release blocking channels.

log: The log uses the loggrus library to provide logging into the library.

func NewQueue

func NewQueue(size uint64) *Queue

NewQueue will instantiate a new Queue with the given size and initialize the connection array of different configurations like []channel and reflect.SelectCase.

func (*Queue) Dequeue

func (q *Queue) Dequeue() batch.BatchItems

Dequeue function will pick a batch item from the channel cases using the pseudo-random technique.

func (*Queue) Enqueue

func (q *Queue) Enqueue(item batch.BatchItems)

Enqueue function will append the received batch item to the array of channel queue and also gets added to the []reflect.SelectCase as a channel case.

func (*Queue) GetEnqueCh

func (q *Queue) GetEnqueCh(index int) chan batch.BatchItems

GetEnqueCh will get the enqueued batch item for an index.

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

func NewSemaphore

func NewSemaphore(n uint64) *Semaphore

func (Semaphore) Acquire

func (s Semaphore) Acquire(n uint64)

func (Semaphore) Lock

func (s Semaphore) Lock()

func (Semaphore) RLock

func (s Semaphore) RLock()

func (Semaphore) RUnlock

func (s Semaphore) RUnlock()

func (Semaphore) Release

func (s Semaphore) Release(n uint64)

func (Semaphore) Unlock

func (s Semaphore) Unlock()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL