Documentation
¶
Overview ¶
options.go — functional options for Pool construction.
Change log ¶
- v0.1: Initial set: WithAutoScale, WithTTL, WithCrashHandler, WithHealthInterval.
Pattern ¶
All public options follow the functional-options pattern (Dave Cheney, 2014). Each option is a function that mutates an internal `config` struct. They are applied in order inside New[C], before any goroutines are started, so there are no synchronization requirements here.
Adding a new option ¶
- Add the field to `config`.
- Set a sensible default in `defaultConfig()`.
- Write a `WithXxx` function below.
- Document the zero-value behaviour in the comment.
pool.go — Session-affine process pool with singleflight Acquire.
Core invariant ¶
1 sessionID → 1 Worker, for the lifetime of the session.
Concurrency model (read this before touching Acquire) ¶
There are three maps protected by a single mutex (p.mu):
p.sessions map[string]Worker[C] — live sessionID → worker bindings
p.inflight map[string]chan struct{} — in-progress Acquire for a session
And one lock-free channel:
p.available chan Worker[C] — free workers ready to be assigned
The singleflight guarantee for Acquire(ctx, sessionID):
- Lock → check sessions → if found: unlock, return (FAST PATH).
- Lock → check inflight → if pending: grab chan, unlock, wait on it, then restart from step 1 when chan closes.
- Lock → create inflight[sessionID] = make(chan struct{}) → unlock.
- Block on <-p.available (or ctx cancel).
- Call w.Healthy(ctx). If unhealthy: discard, close inflight chan, return error.
- Lock → sessions[sessionID]=w, delete inflight[sid], close(ch) → unlock. Closing ch broadcasts to all goroutines waiting in step 2.
- Return &Session[C]{…}
Why a chan struct{} instead of sync.Mutex per session?
- A mutex would only let one waiter in. We need ALL waiters to unblock when the acquiring goroutine completes (step 6 closes ch → zero-copy broadcast).
- Closing a channel is safe to call exactly once and is always non-blocking.
Session.Release ¶
Session.Release removes the entry from p.sessions and pushes the worker back onto p.available. It does NOT close or kill the worker — the binary keeps running and will be assigned to the next caller.
Crash path ¶
processWorker.monitor() calls p.onCrash(sessionID) when it detects that the subprocess exited while holding a session. onCrash:
- Removes the session from p.sessions.
- If there is a pending inflight chan for the same sessionID, closes it so any goroutine waiting in step 2 of Acquire unblocks (they will then get an error because Acquire finds neither a session nor a valid inflight).
- Calls the user-supplied crashHandler if set.
factory.go — WorkerFactory implementations.
What lives here ¶
- ProcessFactory: the default factory. Spawns any OS binary, assigns it an OS-allocated TCP port, and polls GET <address>/health until 200 OK. Users pass this to New[C] so they never have to implement WorkerFactory themselves for the common HTTP case.
Port allocation ¶
Ports are assigned by the OS (net.Listen("tcp", "127.0.0.1:0")). The binary receives its port via the PORT environment variable AND via any arg that contains the literal string "{{.Port}}" — that token is replaced with the actual port number at spawn time.
Health polling ¶
After the process starts, ProcessFactory polls GET <address>/health every 200ms, up to 30 attempts (6 seconds total). If the worker never responds with 200 OK, Spawn returns an error and kills the process. The concrete port + binary are logged at startup.
Crash monitoring ¶
A background goroutine calls cmd.Wait(). On exit, if the worker still holds a sessionID the pool's onCrash callback is invoked so the session affinity map is cleaned up.
pool_ttl.go — Idle session TTL sweeper for Pool[C].
Why a separate file? ¶
pool.go owns the concurrency model (Acquire, Release, onCrash). This file owns time-based session lifecycle — kept separate so each file has one job and can be read/reviewed in isolation.
How it works ¶
Every session that enters p.sessions also gets an entry in p.lastAccessed (a map[string]time.Time guarded by the same p.mu lock). Every call to Acquire that hits the fast path "touches" the session by updating its timestamp. The sweeper goroutine wakes up every ttl/2 and evicts sessions whose timestamp is older than cfg.ttl, calling release() on each so the worker is returned to the available channel.
Concurrency ¶
p.lastAccessed is always read/written under p.mu — the same lock that guards p.sessions and p.inflight. No extra synchronization is needed.
Disabling TTL ¶
Pass WithTTL(0) to New[C]. The ttlSweepLoop in pool.go returns immediately when cfg.ttl == 0, so this file's sweeper is never started.
Index ¶
- Variables
- type Option
- type Pool
- type PoolStats
- type ProcessFactory
- func (f *ProcessFactory) Spawn(ctx context.Context) (Worker[*http.Client], error)
- func (f *ProcessFactory) WithEnv(kv string) *ProcessFactory
- func (f *ProcessFactory) WithHealthPath(path string) *ProcessFactory
- func (f *ProcessFactory) WithStartHealthCheckDelay(d time.Duration) *ProcessFactory
- func (f *ProcessFactory) WithStartTimeout(d time.Duration) *ProcessFactory
- type Session
- type Worker
- type WorkerFactory
Constants ¶
This section is empty.
Variables ¶
var ErrWorkerDead = errors.New("worker process has died")
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*config)
Option is a functional option for New[C].
func WithAutoScale ¶
WithAutoScale sets the minimum and maximum number of live workers.
- min: the pool always keeps at least this many workers healthy, even with zero active sessions.
- max: hard cap on concurrent workers; Acquire blocks once this limit is reached until a worker becomes available.
Panics if min < 1 or max < min.
func WithCrashHandler ¶
WithCrashHandler registers a callback invoked when a worker's subprocess exits unexpectedly while it holds an active session.
fn receives the sessionID that was lost. Use it to:
- Delete session-specific state in your database
- Return an error to the end user ("your session was interrupted")
- Trigger a re-run of the failed job
fn is called from a background monitor goroutine. It must not block for extended periods; spawn a goroutine if you need to do heavy work.
If WithCrashHandler is not set, crashes are only logged.
func WithHealthInterval ¶
WithHealthInterval sets how often the pool's background health-check loop calls Worker.Healthy() on every live worker.
Shorter intervals detect unhealthy workers faster but add more HTTP/RPC overhead. The default (5s) is a good balance for most workloads.
Set d = 0 to disable background health checks entirely. Workers are still checked once during Acquire (step 6 of the singleflight protocol).
func WithStartHealthCheckDelay ¶
WithStartHealthCheckDelay delay the health check for the first time. let the process start and breath before hammering with health checks
func WithTTL ¶
WithTTL sets the idle-session timeout.
A session is considered idle when no Acquire call has touched it within d. When the TTL fires, the session is removed from the affinity map and its worker is returned to the available pool.
Set d = 0 to disable TTL (sessions live until explicitly Released or the pool shuts down). This is useful for REPL-style processes where the caller owns the session lifetime.
func WithWorkerReuse ¶
WithWorkerReuse controls whether a worker is recycled when its session's TTL expires. If true (the default), the worker is returned to the available pool to serve new sessions. If false, the worker process is killed when the session expires, and a fresh worker is spawned to maintain the minimum pool capacity.
type Pool ¶
type Pool[C any] struct { // contains filtered or unexported fields }
Pool manages a set of workers and routes requests by sessionID. Create one with New[C].
func New ¶
func New[C any](factory WorkerFactory[C], opts ...Option) (*Pool[C], error)
New creates a pool backed by factory, applies opts, and starts min workers. Returns an error if any of the initial workers fail to start.
func (*Pool[C]) Acquire ¶
Acquire returns the Worker pinned to sessionID.
If sessionID already has a worker, it is returned immediately (fast path). If sessionID is new, a free worker is popped from the available channel, health-checked, and pinned to the session.
If another goroutine is currently acquiring the same sessionID, this call blocks until that acquisition completes and then returns the same worker (singleflight guarantee — no two goroutines can pin different workers to the same sessionID simultaneously).
Blocks until a worker is available or ctx is cancelled.
type PoolStats ¶
type PoolStats struct {
// TotalWorkers is the number of workers currently registered in the pool
// (starting + healthy + busy). Does not count workers being scaled down.
TotalWorkers int
// AvailableWorkers is the number of idle workers ready to accept a new session.
AvailableWorkers int
// ActiveSessions is the number of sessionID → worker bindings currently live.
ActiveSessions int
// InflightAcquires is the number of Acquire calls currently in the "slow path"
// (waiting for a worker to become available). Useful for queue-depth alerting.
InflightAcquires int
}
PoolStats is a point-in-time snapshot of pool state for dashboards / alerts.
type ProcessFactory ¶
type ProcessFactory struct {
// contains filtered or unexported fields
}
ProcessFactory is the default WorkerFactory[*http.Client]. It spawns `binary` as a subprocess, allocates a free OS port, and polls GET <address>/health until the worker reports healthy.
Use NewProcessFactory to create one; pass it directly to New[C]:
pool, err := herd.New(herd.NewProcessFactory("./my-binary", "--port", "{{.Port}}"))
func NewProcessFactory ¶
func NewProcessFactory(binary string, args ...string) *ProcessFactory
NewProcessFactory returns a ProcessFactory that spawns the given binary.
Any arg containing the literal string "{{.Port}}" is replaced with the OS-assigned port number at spawn time. The port is also injected via the PORT environment variable for binaries that prefer env-based config.
factory := herd.NewProcessFactory("./ollama", "serve", "--port", "{{.Port}}")
func (*ProcessFactory) Spawn ¶
Spawn implements WorkerFactory[*http.Client]. It allocates a free port, starts the binary, and blocks until the worker passes a /health check or ctx is cancelled.
func (*ProcessFactory) WithEnv ¶
func (f *ProcessFactory) WithEnv(kv string) *ProcessFactory
WithEnv appends an extra KEY=VALUE environment variable that is injected into every worker spawned by this factory. The literal string "{{.Port}}" is replaced with the worker's allocated port number, which is useful for binaries that accept the listen address via an env var rather than a flag.
factory := herd.NewProcessFactory("ollama", "serve").
WithEnv("OLLAMA_HOST=127.0.0.1:{{.Port}}").
WithEnv("OLLAMA_MODELS=/tmp/shared-ollama-models")
func (*ProcessFactory) WithHealthPath ¶
func (f *ProcessFactory) WithHealthPath(path string) *ProcessFactory
WithHealthPath sets the HTTP path that herd polls to decide whether a worker is ready. The path must return HTTP 200 when the process is healthy.
Default: "/health"
Use this for binaries that expose liveness on a non-standard path:
factory := herd.NewProcessFactory("ollama", "serve").
WithHealthPath("/") // ollama serves GET / → 200 "Ollama is running"
func (*ProcessFactory) WithStartHealthCheckDelay ¶
func (f *ProcessFactory) WithStartHealthCheckDelay(d time.Duration) *ProcessFactory
WithStartHealthCheckDelay delay the health check for the first time. let the process start and breath before hammering with health checks
func (*ProcessFactory) WithStartTimeout ¶
func (f *ProcessFactory) WithStartTimeout(d time.Duration) *ProcessFactory
WithStartTimeout sets the maximum duration herd will poll the worker's health endpoint after spawning the process before giving up and killing it.
Default: 30 seconds
type Session ¶
type Session[C any] struct { // ID is the sessionID that was passed to Pool.Acquire. ID string // Worker is the underlying worker pinned to this session. // Use Worker.Client() to talk to the subprocess. Worker Worker[C] // contains filtered or unexported fields }
Session is a scoped handle returned by Pool.Acquire.
It binds one sessionID to one worker for the duration of the session. Call Release when the session is done — this frees the worker so it can be assigned to the next sessionID. Failing to call Release leaks a worker.
A Session is NOT safe for concurrent use by multiple goroutines. Multiple HTTP requests for the same sessionID should each call Acquire independently; the pool guarantees they always receive the same underlying worker.
func (*Session[C]) ConnRelease ¶
func (s *Session[C]) ConnRelease()
type Worker ¶
type Worker[C any] interface { // ID returns a stable, unique identifier for this worker (e.g. "worker-3"). // Never reused — not even after a crash and restart. ID() string // Address returns the internal network URI the worker // is listening on (e.g., '127.0.0.1:54321'). Address() string // Client returns the typed connection to the worker process. // For most users this is *http.Client; gRPC users return their stub here. Client() C // Healthy performs a liveness check against the subprocess. // Returns nil if the worker is accepting requests; non-nil otherwise. // Pool.Acquire calls this before handing a worker to a new session, // so a stale or crashed worker is never returned to a caller. Healthy(ctx context.Context) error // Close performs graceful shutdown of the worker process. // Called by the pool during scale-down or Pool.Shutdown. io.Closer }
Worker represents one running subprocess managed by the pool.
C is the typed client the caller uses to talk to the subprocess — for example *http.Client, a gRPC connection, or a custom struct. The type parameter is constrained to "any" so the pool is fully generic.
type WorkerFactory ¶
type WorkerFactory[C any] interface { // Spawn starts one new worker and blocks until it is healthy. // If ctx is cancelled before the worker becomes healthy, Spawn must // kill the process and return a non-nil error. Spawn(ctx context.Context) (Worker[C], error) }
WorkerFactory knows how to spawn one worker process and return a typed Worker[C] that is ready to accept requests (i.e. Healthy returns nil).
Most users never implement this interface — they use NewProcessFactory instead. Implement WorkerFactory only if you need custom spawn logic (e.g. Firecracker microVM, Docker container, remote SSH process).
