Documentation
¶
Overview ¶
Package pooler provides a generic resource pooler for managing reusable resources. It is designed to handle resource acquisition, release, and health checks efficiently, making it suitable for high-concurrency applications.
Features: ¶
The following features are provided by the pooler package:
- Acquiring and releasing resources by key.
- Limiting the number of open resources.
- Automatic health checks and resource cleanup at configurable intervals.
- Statistics for monitoring the pool.
Usage ¶
For example usage, see the documentation for the Pool type.
Index ¶
- Variables
- type FactoryFunc
- type Option
- type Options
- type Pool
- func (p *Pool[T]) Acquire(ctx context.Context, key string) (T, error)
- func (p *Pool[T]) All() iter.Seq2[string, T]
- func (p *Pool[T]) Contains(key string) bool
- func (p *Pool[T]) Release(key string)
- func (p *Pool[T]) ReleaseAll()
- func (p *Pool[T]) Stats() Stats
- func (p *Pool[T]) Walk() (next func() (key string, value T, ok bool), stop func())
- type Pooler
- type Reusable
- type Stats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrFactoryError = errors.New("pool: factory error")
ErrFactoryError is returned when the factory function fails to create a resource.
Functions ¶
This section is empty.
Types ¶
type FactoryFunc ¶
FactoryFunc is a function type that creates a new reusable resource.
type Option ¶
type Option func(*Options)
Option is a function type that configures an Options instance.
func WithHealthCheckInterval ¶
WithHealthCheckInterval sets the interval for health checks.
func WithMaxOpenResources ¶
WithMaxOpenResources sets the maximum number of allowed open resources.
type Options ¶
type Options struct {
MaxOpenResources int // maximum number of open resources
HealthCheckInterval time.Duration // interval for health checks
}
Options defines the configuration options for a pool.
type Pool ¶
type Pool[T Reusable] struct { // contains filtered or unexported fields }
Pool implements Pooler for managing a pool of resources. It is safe for concurrent use by multiple goroutines.
Example ¶
package main
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/bartventer/go-pooler"
)
type (
// Worker is a resource that can be pooled.
Worker struct {
closed bool
mu sync.Mutex
}
// WorkerPool is a pool of Worker instances.
WorkerPool = pooler.Pool[*Worker]
)
// WorkerFactory creates a new Worker instance.
func WorkerFactory() (*Worker, error) {
return &Worker{}, nil
}
// NewWorkerPool creates a new WorkerPool instance.
func NewWorkerPool(ctx context.Context, opts ...pooler.Option) *WorkerPool {
return pooler.NewPool(ctx, WorkerFactory, opts...)
}
var _ pooler.Reusable = new(Worker)
func (m *Worker) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
m.closed = true
return nil
}
func (m *Worker) PingContext(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.closed {
return errors.New("worker is closed")
}
return nil
}
func main() {
ctx := context.Background()
p := NewWorkerPool(ctx, pooler.WithMaxOpenResources(1))
_, err := p.Acquire(ctx, "key1")
if err != nil {
return
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
_, err := p.Acquire(ctx, "key2")
errCh <- err
}()
// Wait for the goroutine to block on Acquire
time.Sleep(100 * time.Millisecond)
// The waiting goroutine should be in the wait queue
stats := p.Stats()
fmt.Println("OpenResources:", stats.OpenResources)
fmt.Println("WaitCount:", stats.WaitCount)
// Release a resource to unblock the waiting goroutine
p.Release("key1")
wg.Wait()
close(errCh)
stats = p.Stats()
fmt.Println("OpenResources:", stats.OpenResources)
fmt.Println("WaitCount:", stats.WaitCount)
fmt.Println("Duration took longer than 100ms:", stats.WaitDuration > 100*time.Millisecond)
}
Output: OpenResources: 1 WaitCount: 1 OpenResources: 1 WaitCount: 0 Duration took longer than 100ms: true
Example (HealthCheck) ¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := NewWorkerPool(ctx, pooler.WithHealthCheckInterval(1*time.Second))
worker, _ := p.Acquire(ctx, "key1")
worker.Close()
// Initial stats
stats := p.Stats()
fmt.Println("OpenResources before health check:", stats.OpenResources)
// Simulate a health check
time.Sleep(2 * time.Second)
stats = p.Stats()
fmt.Println("OpenResources after health check:", stats.OpenResources)
Output: OpenResources before health check: 1 OpenResources after health check: 0
func NewPool ¶
NewPool creates a new pool of resources. It accepts a context, a factory function to create resources, and optional configuration functions.
func (*Pool[T]) Acquire ¶
Acquire acquires a resource by key. If the resource does not exist, it creates a new one using the factory function.
Example (Concurrent) ¶
ctx := context.Background()
p := NewWorkerPool(ctx)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
key := fmt.Sprintf("key%d", i)
_, _ = p.Acquire(ctx, key)
fmt.Println("Worker acquired:", key)
}(i)
}
wg.Wait()
Output: Worker acquired: key0 Worker acquired: key1 Worker acquired: key2
Example (Release) ¶
ctx := context.Background()
p := NewWorkerPool(ctx)
resource, err := p.Acquire(ctx, "key1")
if err != nil {
fmt.Println("Failed to acquire resource:", err)
return
}
fmt.Printf("Worker acquired: %+v\n", resource)
p.Release("key1")
fmt.Println("Worker released")
Output: Worker acquired: &{closed:false mu:{state:0 sema:0}} Worker released
func (*Pool[T]) All ¶
All returns an iterator that yields All resources in the pool.
Example ¶
ctx := context.Background()
p := NewWorkerPool(ctx)
for i := 0; i < 3; i++ {
key := fmt.Sprintf("key%d", i)
_, _ = p.Acquire(ctx, key)
}
for key, worker := range p.All() {
fmt.Printf("%s %+v\n", key, worker)
}
Output: key0 &{closed:false mu:{state:0 sema:0}} key1 &{closed:false mu:{state:0 sema:0}} key2 &{closed:false mu:{state:0 sema:0}}
func (*Pool[T]) Release ¶
Release releases a resource by key. It closes the resource and removes it from the pool.
func (*Pool[T]) ReleaseAll ¶
func (p *Pool[T]) ReleaseAll()
ReleaseAll releases all resources in the pool.
Example ¶
ctx := context.Background()
p := NewWorkerPool(ctx)
for i := 0; i < 3; i++ {
_, _ = p.Acquire(ctx, fmt.Sprintf("key%d", i))
}
stats := p.Stats()
fmt.Println("OpenResources before ReleaseAll:", stats.OpenResources)
p.ReleaseAll()
stats = p.Stats()
fmt.Println("OpenResources after ReleaseAll:", stats.OpenResources)
Output: OpenResources before ReleaseAll: 3 OpenResources after ReleaseAll: 0
func (*Pool[T]) Walk ¶
Walk returns a "pull iterator" that yields all resources in the pool.
Example ¶
ExamplePool_Walk demonstrates iterating over all resources using the Walk method.
ctx := context.Background()
p := NewWorkerPool(ctx)
for i := 0; i < 3; i++ {
key := fmt.Sprintf("key%d", i)
_, _ = p.Acquire(ctx, key)
}
next, stop := p.Walk()
defer stop()
for {
key, worker, ok := next()
if !ok {
break
}
fmt.Printf("%s %+v\n", key, worker)
}
Output: key0 &{closed:false mu:{state:0 sema:0}} key1 &{closed:false mu:{state:0 sema:0}} key2 &{closed:false mu:{state:0 sema:0}}
type Pooler ¶
type Pooler[T Reusable] interface { // Acquire acquires a resource by key. Acquire(ctx context.Context, key string) (T, error) // Release releases a resource by key. Release(key string) // Contains reports whether a resource exists by key. Contains(key string) bool // ReleaseAll releases all resources in the pool. ReleaseAll() // Stats returns the statistics for the pool. Stats() Stats }
Pooler defines the interface for acquiring and releasing resources. The pooler can acquire a resource by key, release a resource by key, check if a resource exists by key, release all resources in the pool, and return the statistics for the pool.
type Reusable ¶
type Reusable interface {
// Close closes the resource.
Close() error
// PingContext performs a health check on the resource.
PingContext(ctx context.Context) error
}
Reusable defines the interface for a resource that can be pooled. The resource must be able to close itself and perform a health check.
type Stats ¶
type Stats struct {
MaxOpenResources int // maximum number of resources
OpenResources int // number of open resources
WaitCount int64 // number of waiters
WaitDuration time.Duration // total time blocked waiting for a resource (from the time Acquire is called)
}
Stats defines the statistics for a pool.