pooler

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2024 License: Apache-2.0 Imports: 8 Imported by: 0

README

go-pooler

Go Reference Release Go Report Card codecov Tests GitHub issues License

The go-pooler package provides a simple and generic resource pooler for managing reusable resources in Go. It is designed to handle resource acquisition, release, and health checks efficiently, making it suitable for high-concurrency applications.

Features

  • Acquiring and Releasing Resources by Key: Manage resources using unique keys.
  • Configurable Maximum Number of Open Resources: Limit the number of open resources.
  • Periodic Health Checks: Automatically perform health checks and resource cleanup at configurable intervals.
  • Pool Statistics: Gather statistics about the pool's usage, such as the number of open resources and wait times.

Installation

To install the package, use:

go get github.com/bartventer/go-pooler

Basic Usage

To create a new pool, define a resource that implements the Reusable interface and use the NewPool function:

package main

import (
	"context"

	"github.com/bartventer/go-pooler"
)

type Worker struct{}
type WorkerPool = pooler.Pool[*Worker]

func (w *Worker) Close() error                          { return nil }
func (w *Worker) PingContext(ctx context.Context) error { return nil }

func WorkerFactory() (*Worker, error) {
	return &Worker{}, nil
}

func NewWorkerPool(ctx context.Context, opts ...pooler.Option) *WorkerPool {
	return pooler.NewPool(ctx, WorkerFactory, opts...)
}

func main() {
	ctx := context.Background()
	p := NewWorkerPool(ctx, pooler.WithMaxOpenResources(1))

	_, err := p.Acquire(ctx, "key1")
	if err != nil {
		panic(err)
	}
	defer p.Release("key1")

	// Use the worker...
}

Documentation

Refer to the GoDoc for detailed documentation and examples.

License

This project is licensed under the Apache License, Version 2.0 - see the LICENSE file for details.

Documentation

Overview

Package pooler provides a generic resource pooler for managing reusable resources. It is designed to handle resource acquisition, release, and health checks efficiently, making it suitable for high-concurrency applications.

Features:

The following features are provided by the pooler package:

  • Acquiring and releasing resources by key.
  • Limiting the number of open resources.
  • Automatic health checks and resource cleanup at configurable intervals.
  • Statistics for monitoring the pool.

Usage

For example usage, see the documentation for the Pool type.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrFactoryError = errors.New("pool: factory error")

ErrFactoryError is returned when the factory function fails to create a resource.

Functions

This section is empty.

Types

type FactoryFunc

type FactoryFunc[T Reusable] func() (T, error)

FactoryFunc is a function type that creates a new reusable resource.

type Option

type Option func(*Options)

Option is a function type that configures an Options instance.

func WithHealthCheckInterval

func WithHealthCheckInterval(interval time.Duration) Option

WithHealthCheckInterval sets the interval for health checks.

func WithMaxOpenResources

func WithMaxOpenResources(n int) Option

WithMaxOpenResources sets the maximum number of allowed open resources.

type Options

type Options struct {
	MaxOpenResources    int           // maximum number of open resources
	HealthCheckInterval time.Duration // interval for health checks
}

Options defines the configuration options for a pool.

type Pool

type Pool[T Reusable] struct {
	// contains filtered or unexported fields
}

Pool implements Pooler for managing a pool of resources. It is safe for concurrent use by multiple goroutines.

Example
package main

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/bartventer/go-pooler"
)

type (
	// Worker is a resource that can be pooled.
	Worker struct {
		closed bool
		mu     sync.Mutex
	}
	// WorkerPool is a pool of Worker instances.
	WorkerPool = pooler.Pool[*Worker]
)

// WorkerFactory creates a new Worker instance.
func WorkerFactory() (*Worker, error) {
	return &Worker{}, nil
}

// NewWorkerPool creates a new WorkerPool instance.
func NewWorkerPool(ctx context.Context, opts ...pooler.Option) *WorkerPool {
	return pooler.NewPool(ctx, WorkerFactory, opts...)
}

var _ pooler.Reusable = new(Worker)

func (m *Worker) Close() error {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.closed = true
	return nil
}

func (m *Worker) PingContext(ctx context.Context) error {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.closed {
		return errors.New("worker is closed")
	}
	return nil
}

func main() {
	ctx := context.Background()
	p := NewWorkerPool(ctx, pooler.WithMaxOpenResources(1))

	_, err := p.Acquire(ctx, "key1")
	if err != nil {
		return
	}

	var wg sync.WaitGroup
	errCh := make(chan error, 1)
	wg.Add(1)
	go func() {
		defer wg.Done()
		_, err := p.Acquire(ctx, "key2")
		errCh <- err
	}()

	// Wait for the goroutine to block on Acquire
	time.Sleep(100 * time.Millisecond)

	// The waiting goroutine should be in the wait queue
	stats := p.Stats()
	fmt.Println("OpenResources:", stats.OpenResources)
	fmt.Println("WaitCount:", stats.WaitCount)

	// Release a resource to unblock the waiting goroutine
	p.Release("key1")
	wg.Wait()
	close(errCh)

	stats = p.Stats()
	fmt.Println("OpenResources:", stats.OpenResources)
	fmt.Println("WaitCount:", stats.WaitCount)
	fmt.Println("Duration took longer than 100ms:", stats.WaitDuration > 100*time.Millisecond)

}
Output:

OpenResources: 1
WaitCount: 1
OpenResources: 1
WaitCount: 0
Duration took longer than 100ms: true
Example (HealthCheck)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p := NewWorkerPool(ctx, pooler.WithHealthCheckInterval(1*time.Second))
worker, _ := p.Acquire(ctx, "key1")
worker.Close()

// Initial stats
stats := p.Stats()
fmt.Println("OpenResources before health check:", stats.OpenResources)

// Simulate a health check
time.Sleep(2 * time.Second)

stats = p.Stats()
fmt.Println("OpenResources after health check:", stats.OpenResources)
Output:

OpenResources before health check: 1
OpenResources after health check: 0

func NewPool

func NewPool[T Reusable](ctx context.Context, factory FactoryFunc[T], opts ...Option) *Pool[T]

NewPool creates a new pool of resources. It accepts a context, a factory function to create resources, and optional configuration functions.

func (*Pool[T]) Acquire

func (p *Pool[T]) Acquire(ctx context.Context, key string) (T, error)

Acquire acquires a resource by key. If the resource does not exist, it creates a new one using the factory function.

Example (Concurrent)
ctx := context.Background()
p := NewWorkerPool(ctx)

var wg sync.WaitGroup
for i := 0; i < 3; i++ {
	wg.Add(1)
	go func(i int) {
		defer wg.Done()
		key := fmt.Sprintf("key%d", i)
		_, _ = p.Acquire(ctx, key)
		fmt.Println("Worker acquired:", key)
	}(i)
}

wg.Wait()
Output:

Worker acquired: key0
Worker acquired: key1
Worker acquired: key2
Example (Release)
ctx := context.Background()
p := NewWorkerPool(ctx)

resource, err := p.Acquire(ctx, "key1")
if err != nil {
	fmt.Println("Failed to acquire resource:", err)
	return
}
fmt.Printf("Worker acquired: %+v\n", resource)

p.Release("key1")
fmt.Println("Worker released")
Output:

Worker acquired: &{closed:false mu:{state:0 sema:0}}
Worker released

func (*Pool[T]) All

func (p *Pool[T]) All() iter.Seq2[string, T]

All returns an iterator that yields All resources in the pool.

Example
ctx := context.Background()
p := NewWorkerPool(ctx)

for i := 0; i < 3; i++ {
	key := fmt.Sprintf("key%d", i)
	_, _ = p.Acquire(ctx, key)
}

for key, worker := range p.All() {
	fmt.Printf("%s %+v\n", key, worker)
}
Output:

key0 &{closed:false mu:{state:0 sema:0}}
key1 &{closed:false mu:{state:0 sema:0}}
key2 &{closed:false mu:{state:0 sema:0}}

func (*Pool[T]) Contains

func (p *Pool[T]) Contains(key string) bool

Contains reports whether a resource exists by key.

func (*Pool[T]) Release

func (p *Pool[T]) Release(key string)

Release releases a resource by key. It closes the resource and removes it from the pool.

func (*Pool[T]) ReleaseAll

func (p *Pool[T]) ReleaseAll()

ReleaseAll releases all resources in the pool.

Example
ctx := context.Background()
p := NewWorkerPool(ctx)

for i := 0; i < 3; i++ {
	_, _ = p.Acquire(ctx, fmt.Sprintf("key%d", i))
}
stats := p.Stats()
fmt.Println("OpenResources before ReleaseAll:", stats.OpenResources)

p.ReleaseAll()
stats = p.Stats()
fmt.Println("OpenResources after ReleaseAll:", stats.OpenResources)
Output:

OpenResources before ReleaseAll: 3
OpenResources after ReleaseAll: 0

func (*Pool[T]) Stats

func (p *Pool[T]) Stats() Stats

Stats returns a snapshot of the pool's statistics.

func (*Pool[T]) Walk

func (p *Pool[T]) Walk() (next func() (key string, value T, ok bool), stop func())

Walk returns a "pull iterator" that yields all resources in the pool.

Example

ExamplePool_Walk demonstrates iterating over all resources using the Walk method.

ctx := context.Background()
p := NewWorkerPool(ctx)

for i := 0; i < 3; i++ {
	key := fmt.Sprintf("key%d", i)
	_, _ = p.Acquire(ctx, key)
}

next, stop := p.Walk()
defer stop()
for {
	key, worker, ok := next()
	if !ok {
		break
	}
	fmt.Printf("%s %+v\n", key, worker)
}
Output:

key0 &{closed:false mu:{state:0 sema:0}}
key1 &{closed:false mu:{state:0 sema:0}}
key2 &{closed:false mu:{state:0 sema:0}}

type Pooler

type Pooler[T Reusable] interface {
	// Acquire acquires a resource by key.
	Acquire(ctx context.Context, key string) (T, error)
	// Release releases a resource by key.
	Release(key string)
	// Contains reports whether a resource exists by key.
	Contains(key string) bool
	// ReleaseAll releases all resources in the pool.
	ReleaseAll()
	// Stats returns the statistics for the pool.
	Stats() Stats
}

Pooler defines the interface for acquiring and releasing resources. The pooler can acquire a resource by key, release a resource by key, check if a resource exists by key, release all resources in the pool, and return the statistics for the pool.

type Reusable

type Reusable interface {
	// Close closes the resource.
	Close() error
	// PingContext performs a health check on the resource.
	PingContext(ctx context.Context) error
}

Reusable defines the interface for a resource that can be pooled. The resource must be able to close itself and perform a health check.

type Stats

type Stats struct {
	MaxOpenResources int           // maximum number of resources
	OpenResources    int           // number of open resources
	WaitCount        int64         // number of waiters
	WaitDuration     time.Duration // total time blocked waiting for a resource (from the time Acquire is called)
}

Stats defines the statistics for a pool.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL