Documentation
¶
Overview ¶
Package toc provides a constrained stage runner inspired by Drum-Buffer-Rope (Theory of Constraints), with pipeline composition via Pipe and NewBatcher.
A Stage owns a bounded input queue and one or more workers with bounded concurrency. Producers submit items via Stage.Submit; the stage processes them through fn and emits results on Stage.Out. When the stage is saturated, Submit blocks — this is the "rope" limiting upstream WIP.
The stage tracks constraint utilization, idle time, and output-blocked time via Stage.Stats, helping operators assess whether this stage is acting as the constraint and whether downstream backpressure is suppressing throughput.
Single-Stage Lifecycle ¶
- Start a stage with Start
- Submit items with Stage.Submit from one or more goroutines
- Call Stage.CloseInput when all submissions are done (see below)
- Read results from Stage.Out until closed — must drain to completion
- Call Stage.Wait (or Stage.Cause) to block until shutdown completes Or combine steps 4-5: Stage.DiscardAndWait / Stage.DiscardAndCause
The goroutine or coordinator that knows no more submissions will occur owns CloseInput. In single-producer code, deferring CloseInput in the submitting goroutine is a good safety net. With multiple producers, a coordinator should call CloseInput after all producers finish (e.g., after a sync.WaitGroup). CloseInput is also called internally on fail-fast error or parent context cancellation, so the input side closes automatically on abnormal paths.
Cardinality: under the liveness conditions below, every Stage.Submit that returns nil yields exactly one rslt.Result on Stage.Out. Submit calls that return an error produce no result.
Operational notes: callers must drain Stage.Out until closed, or use Stage.DiscardAndWait / Stage.DiscardAndCause. If callers stop draining Out, workers block on result delivery and Stage.Wait / Stage.Cause may never return. If fn blocks forever or ignores cancellation, the stage leaks goroutines and never completes. See Stage.Out for full liveness details. Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). See Stage.Cause for terminal-status semantics (Wait vs Cause). See Stage.Wait for completion semantics.
Pipeline Composition ¶
Pipe composes stages by reading from an upstream Result channel, forwarding Ok values to workers and passing Err values directly to the output (error passthrough). NewBatcher accumulates items into fixed-count batches between stages. NewWeightedBatcher accumulates items into weight-based batches (flush when accumulated weight reaches threshold).
Pipelines have two error planes: data-plane errors (per-item rslt.Err in Stage.Out) and control-plane errors (Stage.Wait / Stage.Cause). Forwarded upstream errors are data-plane only — they never trigger fail-fast in the downstream stage.
See the package README for pipeline lifecycle contract, cancellation topology, and selection rubric (hof.PipeErr vs toc.Pipe).
This package is for pipelines with a known bottleneck stage. If you don't know your constraint, profile first.
Example ¶
package main
import (
"context"
"fmt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
// double concatenates a string with itself.
double := func(_ context.Context, s string) (string, error) {
return s + s, nil
}
ctx := context.Background()
// Start's ctx governs stage lifetime; Submit's ctx bounds only admission.
stage := toc.Start(ctx, double, toc.Options[string]{Capacity: 3, Workers: 1})
go func() {
defer stage.CloseInput()
for _, item := range []string{"a", "b", "c"} {
if err := stage.Submit(ctx, item); err != nil {
break
}
}
}()
for result := range stage.Out() {
val, err := result.Unpack()
if err != nil {
fmt.Println("error:", err)
continue
}
fmt.Println(val)
}
if err := stage.Wait(); err != nil {
fmt.Println("stage error:", err)
}
}
Output: aa bb cc
Example (Pipe) ¶
Example_pipe demonstrates basic error passthrough through a Pipe stage.
package main
import (
"context"
"errors"
"fmt"
"github.com/binaryphile/fluentfp/rslt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
ctx := context.Background()
src := make(chan rslt.Result[int], 3)
src <- rslt.Ok(10)
src <- rslt.Err[int](errors.New("oops"))
src <- rslt.Ok(20)
close(src)
// doubleFn doubles the input.
doubleFn := func(_ context.Context, n int) (int, error) {
return n * 2, nil
}
stage := toc.Pipe(ctx, src, doubleFn, toc.Options[int]{})
for r := range stage.Out() {
if v, err := r.Unpack(); err != nil {
fmt.Println("error:", err)
} else {
fmt.Println(v)
}
}
stage.Wait()
// Forwarded errors bypass the worker queue, so the error
// may arrive before queued Ok results complete.
}
Output: error: oops 20 40
Example (Pipeline) ¶
Example_pipeline demonstrates a four-handle pipeline modeled on the era-indexer: Start → Batcher → Pipe → Pipe, with error passthrough and reverse-order Wait.
package main
import (
"context"
"errors"
"fmt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
ctx := context.Background()
// Stage 1: "chunk" each number into a string representation.
chunkFn := func(_ context.Context, n int) (string, error) {
if n == 3 {
return "", errors.New("bad input: 3")
}
return fmt.Sprintf("chunk(%d)", n), nil
}
chunker := toc.Start(ctx, chunkFn, toc.Options[int]{Capacity: 5, ContinueOnError: true})
// Stage 2: Batch strings into groups of 2.
batched := toc.NewBatcher(ctx, chunker.Out(), 2)
// Stage 3: "embed" each batch by joining.
embedFn := func(_ context.Context, batch []string) (string, error) {
result := ""
for i, s := range batch {
if i > 0 {
result += "+"
}
result += s
}
return fmt.Sprintf("embed[%s]", result), nil
}
embedder := toc.Pipe(ctx, batched.Out(), embedFn, toc.Options[[]string]{})
// Stage 4: "store" by uppercasing (identity for this example).
storeFn := func(_ context.Context, s string) (string, error) {
return fmt.Sprintf("store(%s)", s), nil
}
storer := toc.Pipe(ctx, embedder.Out(), storeFn, toc.Options[string]{})
// Feed the head stage.
go func() {
defer chunker.CloseInput()
for _, n := range []int{1, 2, 3, 4, 5} {
if err := chunker.Submit(ctx, n); err != nil {
break
}
}
}()
// Drain the tail.
for r := range storer.Out() {
if v, err := r.Unpack(); err != nil {
fmt.Println("error:", err)
} else {
fmt.Println(v)
}
}
// Wait in reverse order (recommended).
storer.Wait()
embedder.Wait()
batched.Wait()
chunker.Wait()
// Forwarded errors bypass worker queues, so the error from item 3
// may arrive before the batch containing items 1-2 is processed.
}
Output: error: bad input: 3 store(embed[chunk(1)+chunk(2)]) store(embed[chunk(4)+chunk(5)])
Index ¶
- Variables
- type Batcher
- type BatcherStats
- type Options
- type Stage
- func (s *Stage[T, R]) Cause() error
- func (s *Stage[T, R]) CloseInput()
- func (s *Stage[T, R]) DiscardAndCause() error
- func (s *Stage[T, R]) DiscardAndWait() error
- func (s *Stage[T, R]) Out() <-chan rslt.Result[R]
- func (s *Stage[T, R]) Stats() Stats
- func (s *Stage[T, R]) Submit(ctx context.Context, item T) error
- func (s *Stage[T, R]) Wait() error
- type Stats
- type WeightedBatcher
- type WeightedBatcherStats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("toc: stage closed")
ErrClosed is returned by Stage.Submit when the stage is no longer accepting input — after Stage.CloseInput, fail-fast shutdown, or parent context cancellation. See Stage.Submit for race semantics.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher[T any] struct { // contains filtered or unexported fields }
Batcher accumulates up to n Ok items from an upstream Result channel into batches, emitting each batch as rslt.Result[[]T]. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.
Created by NewBatcher. The zero value is not usable.
func NewBatcher ¶
NewBatcher creates a Batcher that reads from src, accumulates up to n Ok values per batch, and emits batches on Out(). Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.
The Batcher drains src to completion (source ownership rule), provided the consumer drains Batcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the Batcher blocks on output delivery and cannot drain src.
Panics if n <= 0, src is nil, or ctx is nil.
func (*Batcher[T]) Out ¶
Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.
Callers MUST drain Out to completion. If the consumer stops reading, the Batcher blocks and cannot drain its source, potentially causing upstream deadlocks.
func (*Batcher[T]) Stats ¶
func (b *Batcher[T]) Stats() BatcherStats
Stats returns approximate metrics. See BatcherStats for caveats. Stats are only reliable as final values after Batcher.Wait returns.
type BatcherStats ¶
type BatcherStats struct {
Received int64 // individual items consumed from src
Emitted int64 // individual Ok items included in emitted batches
Forwarded int64 // Err items forwarded downstream
Dropped int64 // items lost to shutdown/cancel (includes partial batch items)
BufferedDepth int64 // current items in partial batch accumulator
BatchCount int64 // number of batch results emitted
OutputBlockedTime time.Duration // cumulative time blocked sending to out
}
BatcherStats holds metrics for a Batcher.
Invariant (after Wait): Received = Emitted + Forwarded + Dropped.
type Options ¶
type Options[T any] struct { // Capacity is the number of items the input buffer can hold. // Submit blocks when the buffer is full (the "rope"). // Zero means unbuffered: Submit blocks until a worker dequeues. // Negative values panic. Capacity int // Weight returns the cost of item t for stats tracking only // ([Stats.InFlightWeight]). Does not affect admission — capacity // is count-based. Called on the Submit path, so must be cheap. // Must be pure, non-negative, and safe for concurrent calls. // If nil, every item costs 1. Weight func(T) int64 // Workers is the number of concurrent fn invocations. // Zero means default: 1 (serial constraint — the common case). // Negative values panic. Workers int // ContinueOnError, when true, keeps processing after fn errors // instead of cancelling the stage. Default: false (fail-fast). ContinueOnError bool }
Options configures a Stage.
Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). Capacity is always an item-count bound; [Options.Weight] affects stats only, not admission.
type Stage ¶
type Stage[T, R any] struct { // contains filtered or unexported fields }
Stage is a running constrained stage. Created by Start. The zero value is not usable.
func Pipe ¶
func Pipe[T, R any]( ctx context.Context, src <-chan rslt.Result[T], fn func(context.Context, T) (R, error), opts Options[T], ) *Stage[T, R]
Pipe creates a stage that reads from an upstream Result channel, forwarding Ok values to fn via workers and passing Err values directly to the output (error passthrough). The feeder goroutine drains src to completion, provided the consumer drains Stage.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src.
Error passthrough is best-effort during shutdown: if ctx is canceled or fail-fast fires, upstream Err values may be dropped instead of forwarded (reflected in [Stats.Dropped]). During normal operation, all upstream errors are forwarded.
The returned stage's input side is owned by the feeder — do not call Stage.Submit or Stage.CloseInput directly. Both are handled gracefully (no panic, no deadlock) but are misuse. External Submit calls void the stats invariant (Received will not account for externally submitted items).
Stats: [Stats.Received] = [Stats.Submitted] + [Stats.Forwarded] + [Stats.Dropped]. Forwarded errors do not trigger fail-fast and do not affect Stage.Wait.
Panics if ctx is nil, src is nil, or fn is nil.
func Start ¶
func Start[T, R any]( ctx context.Context, fn func(context.Context, T) (R, error), opts Options[T], ) *Stage[T, R]
Start launches a constrained stage that processes items through fn.
The stage starts a cancel watcher goroutine that calls Stage.CloseInput when the context is canceled (either parent cancel or fail-fast). This ensures workers always eventually exit.
Panics if ctx is nil, fn is nil, Capacity is negative, or Workers is negative.
func (*Stage[T, R]) Cause ¶
Cause returns the latched terminal cause of the stage, blocking until shutdown completes. Like Stage.Wait, Cause does not initiate shutdown. Unlike Stage.Wait, Cause distinguishes all three outcomes:
- nil: all items completed successfully (or ContinueOnError with no cancel)
- fail-fast error: the first fn error that caused shutdown
- parent cancel cause: context.Cause of the parent context at completion
The terminal cause is latched when the last worker finishes, so Cause is stable and idempotent — it returns the same value regardless of later parent context changes. If parent cancellation races with a worker error, Cause may return either depending on observation order.
A parent cancellation that occurs after fn returns but before the worker completes result handoff is reported as stage cancellation, even though all business-level computations succeeded. Use Stage.Wait if only fail-fast errors matter.
Requires Out to be drained (see Stage.DiscardAndCause when individual results are not needed).
func (*Stage[T, R]) CloseInput ¶
func (s *Stage[T, R]) CloseInput()
CloseInput signals that no more items will be submitted. Workers finish processing buffered items, then shut down.
Blocks briefly until all in-flight Stage.Submit calls exit, then closes the input channel.
Idempotent — safe to call multiple times (use defer as safety net). Also called internally on fail-fast or parent context cancellation.
func (*Stage[T, R]) DiscardAndCause ¶
DiscardAndCause drains all remaining results from Stage.Out and returns Stage.Cause's latched terminal cause. Use when individual results are not needed but composable terminal status is required.
Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndCause with direct Out consumption causes a consumption race (results go to the wrong reader).
func (*Stage[T, R]) DiscardAndWait ¶
DiscardAndWait drains all remaining results from Stage.Out and returns Stage.Wait's error. Use when individual results are not needed.
Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndWait with direct Out consumption causes a consumption race (results go to the wrong reader).
func (*Stage[T, R]) Out ¶
Out returns the receive-only output channel. It closes after all workers finish and all results have been sent.
Cardinality: every successful Stage.Submit produces exactly one result.
Ordering: with Workers == 1, results are delivered in submit order. With Workers > 1, result order is nondeterministic.
Callers MUST drain Out to completion (or use Stage.DiscardAndWait / Stage.DiscardAndCause):
for result := range stage.Out() {
val, err := result.Unpack()
// handle result — do NOT break out of this loop
}
After cancellation or fail-fast, Out may still emit: success results from work already in fn, ordinary error results from in-flight work, and canceled results for buffered items drained post-cancel. With Workers > 1, the fail-fast triggering error is not guaranteed to appear before cancellation results; use Stage.Wait or Stage.Cause for stage-level terminal status, not stream order.
If the consumer stops reading, workers block sending results, which prevents shutdown and causes Stage.Wait to hang — leaking goroutines, context resources, and the stage itself. This is the same contract as consuming from any Go channel-based pipeline.
func (*Stage[T, R]) Stats ¶
Stats returns approximate metrics. See Stats for caveats. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.
func (*Stage[T, R]) Submit ¶
Submit sends item into the stage for processing. Blocks when the buffer is full (backpressure / "rope"). Returns ErrClosed after Stage.CloseInput, fail-fast shutdown, or parent context cancellation.
If cancellation or CloseInput has already occurred before Submit is called, Submit deterministically returns ErrClosed without blocking. A Submit that is blocked or entering concurrently when shutdown fires may nondeterministically succeed or return an error, per Go select semantics — even a blocked Submit can succeed if capacity becomes available at the same instant. Items admitted during this window are processed normally (or canceled if the stage context is already done).
The ctx parameter controls only admission blocking — it is NOT passed to fn. The stage's own context (derived from the ctx passed to Start) is what fn receives. This means canceling a submitter's context does not cancel the item's processing once admitted.
Panics if ctx is nil (same as context.Context method calls). Panics if Weight returns a negative value. Note: a panic in Weight propagates to the caller (unlike fn panics, which are recovered and wrapped in rslt.PanicError). Safe for concurrent use from multiple goroutines. Safe to call concurrently with Stage.CloseInput (will not panic).
func (*Stage[T, R]) Wait ¶
Wait blocks until all workers have finished and Out is closed. Wait does not initiate shutdown — call Stage.CloseInput first. Without CloseInput, Wait blocks forever if no more items are submitted. Requires that Out is drained concurrently (see Stage.Out). The stage is complete when all workers have finished, terminal status is latched, and the done channel closes.
Returns the first observed fail-fast error, or nil. Specifically:
- In fail-fast mode: returns the first fn error that caused shutdown (nondeterministic among concurrent workers — first to acquire lock)
- In ContinueOnError mode: always returns nil (check individual results)
- On parent context cancellation: returns nil (caller already knows; errors returned by fn after parent cancel are not stored)
If parent cancellation races with a worker error, Wait may return either nil or the error depending on observation order. Use Stage.Cause for terminal status that distinguishes all three outcomes.
type Stats ¶
type Stats struct {
Submitted int64 // items accepted by Submit (successful return)
Completed int64 // items where fn returned (includes Failed and Panicked)
Failed int64 // subset of Completed where result is error (includes Panicked)
Panicked int64 // subset of Failed where fn panicked
Canceled int64 // items dequeued but not passed to fn because cancellation was observed first (not in Completed)
// Pipe-specific counters. Zero for Start-created stages.
// Invariant (after Wait): Received = Submitted + Forwarded + Dropped.
Received int64 // items consumed from src by feeder (any Result)
Forwarded int64 // upstream Err items sent directly to out (bypassed fn)
Dropped int64 // items seen but neither submitted nor forwarded (shutdown/cancel)
ServiceTime time.Duration // cumulative time fn was executing
IdleTime time.Duration // cumulative worker time waiting for input (includes startup and tail wait)
OutputBlockedTime time.Duration // cumulative worker time blocked handing result to consumer (unbuffered out channel)
BufferedDepth int64 // approximate items in queue; may transiently be negative mid-flight; 0 when Capacity is 0 (unbuffered)
InFlightWeight int64 // weighted cost of items currently in fn (stats-only, not admission)
QueueCapacity int // configured capacity
}
Stats holds approximate metrics for a Stage.
Fields are read from independent atomics, so a single Stats value is NOT a consistent snapshot — individual fields may reflect slightly different moments. Relationships like Submitted >= Completed + Canceled are not guaranteed mid-flight. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.
All durations are cumulative across all workers since Start. With Workers > 1, durations can exceed wall-clock time.
type WeightedBatcher ¶
type WeightedBatcher[T any] struct { // contains filtered or unexported fields }
WeightedBatcher accumulates Ok items from an upstream Result channel into batches, flushing when accumulated weight OR item count reaches the threshold. Each item's weight is determined by weightFn. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.
Created by NewWeightedBatcher. The zero value is not usable.
func NewWeightedBatcher ¶
func NewWeightedBatcher[T any]( ctx context.Context, src <-chan rslt.Result[T], threshold int, weightFn func(T) int, ) *WeightedBatcher[T]
NewWeightedBatcher creates a WeightedBatcher that reads from src, accumulates Ok values, and emits batches on Out(). A batch is flushed when either accumulated weight (per weightFn) reaches threshold or item count reaches threshold — whichever comes first. The item-count fallback prevents unbounded accumulation of zero/low-weight items. Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.
weightFn must return a non-negative weight for each item. A panic occurs if weightFn returns a negative value.
The WeightedBatcher drains src to completion (source ownership rule), provided the consumer drains WeightedBatcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the WeightedBatcher blocks on output delivery and cannot drain src.
Panics if threshold <= 0, weightFn is nil, src is nil, or ctx is nil.
func (*WeightedBatcher[T]) Out ¶
func (b *WeightedBatcher[T]) Out() <-chan rslt.Result[[]T]
Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.
Callers MUST drain Out to completion. If the consumer stops reading, the WeightedBatcher blocks and cannot drain its source, potentially causing upstream deadlocks.
func (*WeightedBatcher[T]) Stats ¶
func (b *WeightedBatcher[T]) Stats() WeightedBatcherStats
Stats returns approximate metrics. See WeightedBatcherStats for caveats. Stats are only reliable as final values after WeightedBatcher.Wait returns.
func (*WeightedBatcher[T]) Wait ¶
func (b *WeightedBatcher[T]) Wait() error
Wait blocks until the WeightedBatcher goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted output send), nil otherwise. The WeightedBatcher has no fn, so there is no fail-fast error.
type WeightedBatcherStats ¶
type WeightedBatcherStats struct {
Received int64 // individual items consumed from src
Emitted int64 // individual Ok items included in emitted batches
Forwarded int64 // Err items forwarded downstream
Dropped int64 // items lost to shutdown/cancel (includes partial batch items)
BufferedDepth int64 // current items in partial batch accumulator
BufferedWeight int64 // current accumulated weight in partial batch
BatchCount int64 // number of batch results emitted
OutputBlockedTime time.Duration // cumulative time blocked sending to out
}
WeightedBatcherStats holds metrics for a WeightedBatcher.
Invariant (after Wait): Received = Emitted + Forwarded + Dropped.