gcopool

package module
v0.0.0-...-23c2ab3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

README

gcopool Travis-CI AppVeyor GoDoc Report card Sourcegraph

Connection pool derived from the Google's Spanner client. It can manage resources of any kind, supports health checks and support tracing.

Quick start

Implement resource interface is everything you need to start:

type Resource interface {
	ID() string
}

and go with:

pool, err := New(Config{
    CreateResource: createResource,
    MaxOpened: 10,
})
handle, err := pool.Take(context.Background())
doJob(handle.GetResource())
handle.Recycle()
pool.Close()

Resource

Resource is an entity that is managed by the pool. The pool supports health checking. The resource can be of 2 types: read-only something (transaction) resource and read-write something (transaction) resource.

type Resource interface {
	// Unique immutable resource id per pool
	ID() string
}

type Heartbeat interface {
	// Ping verifies if the resource is still alive.
	Ping(context.Context) error
}

type TX interface {
	Prepared() bool
	// Prepare called only once when transitioning from read-only to read-write mode
	Prepare(context.Context) error

	Begin(context.Context) error
	Commit(context.Context) error
	Abort(context.Context) error
}

type Hook interface {
	Handle(ctx context.Context, event EventCode, object interface{}) error
}

// ErrUnavailable is an error which can be returned by the ping
// implementation to report that the resource is unavailable
// any more and it must be removed from the pool.
var ErrUnavailable = errors.New("session is unavailable")
var ErrTXUnsupported = errors.New("read-write mode is unsupported")

const (
	EventAcquire = EventCode(1) // must NOT do blocking IO
	EventRelease = EventCode(2) // must NOT do blocking IO
	EventDestroy = EventCode(3) // have context with a deadline
)

Documentation

Overview

Package gcopool implements a thread safe resource pool to manage and reuse something related to the sessions or connections. The implementation is derived from the Google Spanner client. It supports max, min, idle lists, read/write resources, health checking and tracing.

Index

Constants

View Source
const (
	EventAcquire = EventCode(1) // must NOT do blocking IO
	EventRelease = EventCode(2) // must NOT do blocking IO
	EventDestroy = EventCode(3) // have context with a deadline
)

Variables

View Source
var (
	// OpenSessionCount is a measure of the number of sessions currently opened.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	OpenSessionCount = stats.Int64(statsPrefix+"open_session_count", "Number of sessions currently opened",
		stats.UnitDimensionless)

	// OpenSessionCountView is a view of the last value of OpenSessionCount.
	// It is EXPERIMENTAL and subject to change or removal without notice.
	OpenSessionCountView = &view.View{
		Name:        OpenSessionCount.Name(),
		Description: OpenSessionCount.Description(),
		Measure:     OpenSessionCount,
		Aggregation: view.LastValue(),
	}
)
View Source
var DefaultBackoff = ExponentialBackoff{minBackoff, maxBackoff}
View Source
var ErrGetSessionTimeout = errors.New("timeout / context canceled during getting session")

ErrGetSessionTimeout returns error for context timeout during Pool.Take().

View Source
var ErrInvalidSessionPool = errors.New("invalid session pool")
View Source
var ErrRetryable = errors.New("retry")
View Source
var ErrTXUnsupported = errors.New("read-write mode is unsupported")
View Source
var ErrUnavailable = errors.New("session is unavailable")

ErrUnavailable is an error which can be returned by the ping implementation to report that the resource is unavailable any more and it must be removed from the pool.

Functions

func RunRetryable

func RunRetryable(ctx context.Context, f func(context.Context) error) error

RunRetryable keeps attempting to run f until one of the following happens:

  1. f returns nil error or an unretryable error;
  2. context is cancelled or timeout.

TODO: consider using https://github.com/googleapis/gax-go/v2 once it becomes available internally.

Types

type Config

type Config struct {
	// CreateResource is the caller supplied method for getting a session, this makes session pool able to use pooling.
	// It must assign a unique ID to a resource across the pool lifetime.
	CreateResource func(context.Context) (Resource, error)
	// MaxOpened is the maximum number of opened sessions allowed by the session
	// pool. If the resource tries to open a session and
	// there are already MaxOpened sessions, it will block until one becomes
	// available or the context passed to the resource method is canceled or times out.
	MaxOpened uint64
	// MinOpened is the minimum number of opened sessions that the session pool
	// tries to maintain. Session pool won't continue to expire sessions if number
	// of opened connections drops below MinOpened. However, if a session is found
	// to be broken, it will still be evicted from the session pool, therefore it is
	// posssible that the number of opened sessions drops below MinOpened.
	MinOpened uint64
	// MaxIdle is the maximum number of idle sessions, pool is allowed to keep. Defaults to 0.
	MaxIdle uint64
	// MaxBurst is the maximum number of concurrent session creation requests. Defaults to 10.
	MaxBurst uint64
	// WriteSessions is the fraction of sessions we try to keep prepared for write.
	WriteSessions float64
	// HealthCheckWorkers is number of workers used by health checker for this pool.
	HealthCheckWorkers int
	// HealthCheckInterval is how often the health checker pings a session. Defaults to 5 min.
	HealthCheckInterval time.Duration
	// HealthCheckSampleInterval is how often the health checker samples live session (for use in maintaining session pool size). Defaults to 1 min.
	HealthCheckSampleInterval time.Duration
	// Labels for the sessions created in the session pool.
	Labels map[string]string
}

Config stores configurations of a session pool.

func (Config) Validate

func (spc Config) Validate() error

Validate verifies that the Config is good for use.

type EventCode

type EventCode int

type ExponentialBackoff

type ExponentialBackoff struct {
	Min, Max time.Duration
}

func (ExponentialBackoff) Delay

func (b ExponentialBackoff) Delay(retries int) time.Duration

delay calculates the delay that should happen at n-th exponential backoff in a series.

type Handle

type Handle struct {
	// contains filtered or unexported fields
}

Handle is an interface for transactions to access sessions safely. It is generated by Pool.Take().

func (*Handle) Destroy

func (h *Handle) Destroy()

Destroy destroys the inner session object. It is safe to call Destroy multiple times and only the first call would attempt to Destroy the inner session object.

func (*Handle) GetID

func (h *Handle) GetID() string

GetID gets the session ID from the internal session object. GetID returns empty string if the Handle is nil or the inner session object has been released by Recycle / Destroy.

func (*Handle) GetResource

func (h *Handle) GetResource() Resource

GetResource gets the resource associated with the session ID in Handle.

func (*Handle) Recycle

func (h *Handle) Recycle()

Recycle gives the inner session object back to its home session pool. It is safe to call Recycle multiple times but only the first one would take effect.

type Heartbeat

type Heartbeat interface {
	// Ping verifies if the resource is still alive.
	Ping(context.Context) error
}

type Hook

type Hook interface {
	Handle(ctx context.Context, event EventCode, object interface{}) error
}

type Pool

type Pool struct {

	// configuration of the session pool.
	Config
	// contains filtered or unexported fields
}

Pool creates and caches sessions.

func New

func New(config Config) (*Pool, error)

New creates a new session pool.

func (*Pool) Close

func (p *Pool) Close()

Close marks the session pool as closed.

func (*Pool) IsHealthy

func (p *Pool) IsHealthy(h *Handle) bool

func (*Pool) IsValid

func (p *Pool) IsValid() bool

IsValid checks if the session pool is still valid.

func (*Pool) Take

func (p *Pool) Take(ctx context.Context) (*Handle, error)

Take returns a cached session if there are available ones; if there isn't any, it tries to allocate a new one. Session returned by Take should be used for read operations.

func (*Pool) TakeWriteSession

func (p *Pool) TakeWriteSession(ctx context.Context) (*Handle, error)

TakeWriteSession returns a write prepared cached session if there are available ones; if there isn't any, it tries to allocate a new one. Session returned should be used for read write transactions.

type Resource

type Resource interface {
	// Unique immutable resource id per pool
	ID() string
}

type TX

type TX interface {
	Prepared() bool
	// Prepare called only once when transitioning from read-only to read-write mode
	Prepare(context.Context) error

	Begin(context.Context) error
	Commit(context.Context) error
	Abort(context.Context) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL