Documentation
¶
Overview ¶
Package pooler provides a generic resource pooler for managing reusable resources. It is designed to handle resource acquisition, release, and health checks efficiently, making it suitable for high-concurrency applications.
Features: ¶
The following features are provided by the pooler package:
- Acquiring and releasing resources by key.
- Limiting the number of open resources.
- Automatic health checks and resource cleanup at configurable intervals.
- Statistics for monitoring the pool.
Usage ¶
For example usage, see the documentation for the Pool type.
Index ¶
- Variables
- type FactoryFunc
- type Option
- type Options
- type Pool
- func (p *Pool[T]) Acquire(ctx context.Context, key string) (T, error)
- func (p *Pool[T]) All() iter.Seq2[string, T]
- func (p *Pool[T]) Contains(key string) bool
- func (p *Pool[T]) Release(key string)
- func (p *Pool[T]) ReleaseAll()
- func (p *Pool[T]) Stats() Stats
- func (p *Pool[T]) Walk() (next func() (key string, value T, ok bool), stop func())
- type Pooler
- type Reusable
- type Stats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrFactoryError = errors.New("pool: factory error")
ErrFactoryError is returned when the factory function fails to create a resource.
Functions ¶
This section is empty.
Types ¶
type FactoryFunc ¶
FactoryFunc is a function type that creates a new reusable resource.
type Option ¶
type Option func(*Options)
Option is a function type that configures an Options instance.
func WithHealthCheckInterval ¶
WithHealthCheckInterval sets the interval for health checks.
func WithMaxOpenResources ¶
WithMaxOpenResources sets the maximum number of allowed open resources.
type Options ¶
type Options struct { MaxOpenResources int // maximum number of open resources HealthCheckInterval time.Duration // interval for health checks }
Options defines the configuration options for a pool.
type Pool ¶
type Pool[T Reusable] struct { // contains filtered or unexported fields }
Pool implements Pooler for managing a pool of resources. It is safe for concurrent use by multiple goroutines.
Example ¶
package main import ( "context" "errors" "fmt" "sync" "time" "github.com/bartventer/go-pooler" ) type ( // Worker is a resource that can be pooled. Worker struct { closed bool mu sync.Mutex } // WorkerPool is a pool of Worker instances. WorkerPool = pooler.Pool[*Worker] ) // WorkerFactory creates a new Worker instance. func WorkerFactory() (*Worker, error) { return &Worker{}, nil } // NewWorkerPool creates a new WorkerPool instance. func NewWorkerPool(ctx context.Context, opts ...pooler.Option) *WorkerPool { return pooler.NewPool(ctx, WorkerFactory, opts...) } var _ pooler.Reusable = new(Worker) func (m *Worker) Close() error { m.mu.Lock() defer m.mu.Unlock() m.closed = true return nil } func (m *Worker) PingContext(ctx context.Context) error { m.mu.Lock() defer m.mu.Unlock() if m.closed { return errors.New("worker is closed") } return nil } func main() { ctx := context.Background() p := NewWorkerPool(ctx, pooler.WithMaxOpenResources(1)) _, err := p.Acquire(ctx, "key1") if err != nil { return } var wg sync.WaitGroup errCh := make(chan error, 1) wg.Add(1) go func() { defer wg.Done() _, err := p.Acquire(ctx, "key2") errCh <- err }() // Wait for the goroutine to block on Acquire time.Sleep(100 * time.Millisecond) // The waiting goroutine should be in the wait queue stats := p.Stats() fmt.Println("OpenResources:", stats.OpenResources) fmt.Println("WaitCount:", stats.WaitCount) // Release a resource to unblock the waiting goroutine p.Release("key1") wg.Wait() close(errCh) stats = p.Stats() fmt.Println("OpenResources:", stats.OpenResources) fmt.Println("WaitCount:", stats.WaitCount) fmt.Println("Duration took longer than 100ms:", stats.WaitDuration > 100*time.Millisecond) }
Output: OpenResources: 1 WaitCount: 1 OpenResources: 1 WaitCount: 0 Duration took longer than 100ms: true
Example (HealthCheck) ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := NewWorkerPool(ctx, pooler.WithHealthCheckInterval(1*time.Second)) worker, _ := p.Acquire(ctx, "key1") worker.Close() // Initial stats stats := p.Stats() fmt.Println("OpenResources before health check:", stats.OpenResources) // Simulate a health check time.Sleep(2 * time.Second) stats = p.Stats() fmt.Println("OpenResources after health check:", stats.OpenResources)
Output: OpenResources before health check: 1 OpenResources after health check: 0
func NewPool ¶
NewPool creates a new pool of resources. It accepts a context, a factory function to create resources, and optional configuration functions.
func (*Pool[T]) Acquire ¶
Acquire acquires a resource by key. If the resource does not exist, it creates a new one using the factory function.
Example (Concurrent) ¶
ctx := context.Background() p := NewWorkerPool(ctx) var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(i int) { defer wg.Done() key := fmt.Sprintf("key%d", i) _, _ = p.Acquire(ctx, key) fmt.Println("Worker acquired:", key) }(i) } wg.Wait()
Output: Worker acquired: key0 Worker acquired: key1 Worker acquired: key2
Example (Release) ¶
ctx := context.Background() p := NewWorkerPool(ctx) resource, err := p.Acquire(ctx, "key1") if err != nil { fmt.Println("Failed to acquire resource:", err) return } fmt.Printf("Worker acquired: %+v\n", resource) p.Release("key1") fmt.Println("Worker released")
Output: Worker acquired: &{closed:false mu:{state:0 sema:0}} Worker released
func (*Pool[T]) All ¶
All returns an iterator that yields All resources in the pool.
Example ¶
ctx := context.Background() p := NewWorkerPool(ctx) for i := 0; i < 3; i++ { key := fmt.Sprintf("key%d", i) _, _ = p.Acquire(ctx, key) } for key, worker := range p.All() { fmt.Printf("%s %+v\n", key, worker) }
Output: key0 &{closed:false mu:{state:0 sema:0}} key1 &{closed:false mu:{state:0 sema:0}} key2 &{closed:false mu:{state:0 sema:0}}
func (*Pool[T]) Release ¶
Release releases a resource by key. It closes the resource and removes it from the pool.
func (*Pool[T]) ReleaseAll ¶
func (p *Pool[T]) ReleaseAll()
ReleaseAll releases all resources in the pool.
Example ¶
ctx := context.Background() p := NewWorkerPool(ctx) for i := 0; i < 3; i++ { _, _ = p.Acquire(ctx, fmt.Sprintf("key%d", i)) } stats := p.Stats() fmt.Println("OpenResources before ReleaseAll:", stats.OpenResources) p.ReleaseAll() stats = p.Stats() fmt.Println("OpenResources after ReleaseAll:", stats.OpenResources)
Output: OpenResources before ReleaseAll: 3 OpenResources after ReleaseAll: 0
func (*Pool[T]) Walk ¶
Walk returns a "pull iterator" that yields all resources in the pool.
Example ¶
ExamplePool_Walk demonstrates iterating over all resources using the Walk method.
ctx := context.Background() p := NewWorkerPool(ctx) for i := 0; i < 3; i++ { key := fmt.Sprintf("key%d", i) _, _ = p.Acquire(ctx, key) } next, stop := p.Walk() defer stop() for { key, worker, ok := next() if !ok { break } fmt.Printf("%s %+v\n", key, worker) }
Output: key0 &{closed:false mu:{state:0 sema:0}} key1 &{closed:false mu:{state:0 sema:0}} key2 &{closed:false mu:{state:0 sema:0}}
type Pooler ¶
type Pooler[T Reusable] interface { // Acquire acquires a resource by key. Acquire(ctx context.Context, key string) (T, error) // Release releases a resource by key. Release(key string) // Contains reports whether a resource exists by key. Contains(key string) bool // ReleaseAll releases all resources in the pool. ReleaseAll() // Stats returns the statistics for the pool. Stats() Stats }
Pooler defines the interface for acquiring and releasing resources. The pooler can acquire a resource by key, release a resource by key, check if a resource exists by key, release all resources in the pool, and return the statistics for the pool.
type Reusable ¶
type Reusable interface { // Close closes the resource. Close() error // PingContext performs a health check on the resource. PingContext(ctx context.Context) error }
Reusable defines the interface for a resource that can be pooled. The resource must be able to close itself and perform a health check.
type Stats ¶
type Stats struct { MaxOpenResources int // maximum number of resources OpenResources int // number of open resources WaitCount int64 // number of waiters WaitDuration time.Duration // total time blocked waiting for a resource (from the time Acquire is called) }
Stats defines the statistics for a pool.