Documentation
¶
Overview ¶
Package toc provides a constrained stage runner inspired by Drum-Buffer-Rope (Theory of Constraints), with pipeline composition via Pipe, NewBatcher, NewTee, and NewMerge.
A Stage owns a bounded input queue and one or more workers with bounded concurrency. Producers submit items via Stage.Submit; the stage processes them through fn and emits results on Stage.Out. When the stage is saturated, Submit blocks — this is the "rope" limiting upstream WIP.
The stage tracks constraint utilization, idle time, and output-blocked time via Stage.Stats, helping operators assess whether this stage is acting as the constraint and whether downstream backpressure is suppressing throughput.
Single-Stage Lifecycle ¶
- Start a stage with Start
- Submit items with Stage.Submit from one or more goroutines
- Call Stage.CloseInput when all submissions are done (see below)
- Read results from Stage.Out until closed — must drain to completion
- Call Stage.Wait (or Stage.Cause) to block until shutdown completes Or combine steps 4-5: Stage.DiscardAndWait / Stage.DiscardAndCause
The goroutine or coordinator that knows no more submissions will occur owns CloseInput. In single-producer code, deferring CloseInput in the submitting goroutine is a good safety net. With multiple producers, a coordinator should call CloseInput after all producers finish (e.g., after a sync.WaitGroup). CloseInput is also called internally on fail-fast error or parent context cancellation, so the input side closes automatically on abnormal paths.
Cardinality: under the liveness conditions below, every Stage.Submit that returns nil yields exactly one rslt.Result on Stage.Out. Submit calls that return an error produce no result.
Operational notes: callers must drain Stage.Out until closed, or use Stage.DiscardAndWait / Stage.DiscardAndCause. If callers stop draining Out, workers block on result delivery and Stage.Wait / Stage.Cause may never return. If fn blocks forever or ignores cancellation, the stage leaks goroutines and never completes. See Stage.Out for full liveness details. Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). See Stage.Cause for terminal-status semantics (Wait vs Cause). See Stage.Wait for completion semantics.
Pipeline Composition ¶
Pipe composes stages by reading from an upstream Result channel, forwarding Ok values to workers and passing Err values directly to the output (error passthrough). NewBatcher accumulates items into fixed-count batches between stages. NewWeightedBatcher accumulates items into weight-based batches (flush when accumulated weight reaches threshold). NewTee broadcasts each item to N branches (synchronous lockstep — slowest consumer governs pace). NewMerge recombines multiple upstream Result channels into a single nondeterministic stream (fan-in). NewJoin recombines two branch results into one combined output (strict branch recombination — one item from each source, combined via a function).
Pipelines have two error planes: data-plane errors (per-item rslt.Err in Stage.Out) and control-plane errors (Stage.Wait / Stage.Cause). Forwarded upstream errors are data-plane only — they never trigger fail-fast in the downstream stage.
See the package README for pipeline lifecycle contract, cancellation topology, and selection rubric (hof.PipeErr vs toc.Pipe).
This package is for pipelines with a known bottleneck stage. If you don't know your constraint, profile first.
Example ¶
package main
import (
"context"
"fmt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
// double concatenates a string with itself.
double := func(_ context.Context, s string) (string, error) {
return s + s, nil
}
ctx := context.Background()
// Start's ctx governs stage lifetime; Submit's ctx bounds only admission.
stage := toc.Start(ctx, double, toc.Options[string]{Capacity: 3, Workers: 1})
go func() {
defer stage.CloseInput()
for _, item := range []string{"a", "b", "c"} {
if err := stage.Submit(ctx, item); err != nil {
break
}
}
}()
for result := range stage.Out() {
val, err := result.Unpack()
if err != nil {
fmt.Println("error:", err)
continue
}
fmt.Println(val)
}
if err := stage.Wait(); err != nil {
fmt.Println("stage error:", err)
}
}
Output: aa bb cc
Example (Pipe) ¶
Example_pipe demonstrates basic error passthrough through a Pipe stage.
package main
import (
"context"
"errors"
"fmt"
"github.com/binaryphile/fluentfp/rslt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
ctx := context.Background()
src := make(chan rslt.Result[int], 3)
src <- rslt.Ok(10)
src <- rslt.Err[int](errors.New("oops"))
src <- rslt.Ok(20)
close(src)
// doubleFn doubles the input.
doubleFn := func(_ context.Context, n int) (int, error) {
return n * 2, nil
}
stage := toc.Pipe(ctx, src, doubleFn, toc.Options[int]{})
for r := range stage.Out() {
if v, err := r.Unpack(); err != nil {
fmt.Println("error:", err)
} else {
fmt.Println(v)
}
}
stage.Wait()
// Forwarded errors bypass the worker queue, so the error
// may arrive before queued Ok results complete.
}
Output: error: oops 20 40
Example (Pipeline) ¶
Example_pipeline demonstrates a four-handle pipeline modeled on the era-indexer: Start → Batcher → Pipe → Pipe, with error passthrough and reverse-order Wait.
package main
import (
"context"
"errors"
"fmt"
"github.com/binaryphile/fluentfp/toc"
)
func main() {
ctx := context.Background()
// Stage 1: "chunk" each number into a string representation.
chunkFn := func(_ context.Context, n int) (string, error) {
if n == 3 {
return "", errors.New("bad input: 3")
}
return fmt.Sprintf("chunk(%d)", n), nil
}
chunker := toc.Start(ctx, chunkFn, toc.Options[int]{Capacity: 5, ContinueOnError: true})
// Stage 2: Batch strings into groups of 2.
batched := toc.NewBatcher(ctx, chunker.Out(), 2)
// Stage 3: "embed" each batch by joining.
embedFn := func(_ context.Context, batch []string) (string, error) {
result := ""
for i, s := range batch {
if i > 0 {
result += "+"
}
result += s
}
return fmt.Sprintf("embed[%s]", result), nil
}
embedder := toc.Pipe(ctx, batched.Out(), embedFn, toc.Options[[]string]{})
// Stage 4: "store" by uppercasing (identity for this example).
storeFn := func(_ context.Context, s string) (string, error) {
return fmt.Sprintf("store(%s)", s), nil
}
storer := toc.Pipe(ctx, embedder.Out(), storeFn, toc.Options[string]{})
// Feed the head stage.
go func() {
defer chunker.CloseInput()
for _, n := range []int{1, 2, 3, 4, 5} {
if err := chunker.Submit(ctx, n); err != nil {
break
}
}
}()
// Drain the tail.
for r := range storer.Out() {
if v, err := r.Unpack(); err != nil {
fmt.Println("error:", err)
} else {
fmt.Println(v)
}
}
// Wait in reverse order (recommended).
storer.Wait()
embedder.Wait()
batched.Wait()
chunker.Wait()
// Forwarded errors bypass worker queues, so the error from item 3
// may arrive before the batch containing items 1-2 is processed.
}
Output: error: bad input: 3 store(embed[chunk(1)+chunk(2)]) store(embed[chunk(4)+chunk(5)])
Index ¶
- Variables
- type Batcher
- type BatcherStats
- type Join
- type JoinStats
- type Merge
- type MergeStats
- type MissingResultError
- type Options
- type Stage
- func (s *Stage[T, R]) Cause() error
- func (s *Stage[T, R]) CloseInput()
- func (s *Stage[T, R]) DiscardAndCause() error
- func (s *Stage[T, R]) DiscardAndWait() error
- func (s *Stage[T, R]) Out() <-chan rslt.Result[R]
- func (s *Stage[T, R]) Stats() Stats
- func (s *Stage[T, R]) Submit(ctx context.Context, item T) error
- func (s *Stage[T, R]) Wait() error
- type Stats
- type Tee
- type TeeStats
- type WeightedBatcher
- type WeightedBatcherStats
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("toc: stage closed")
ErrClosed is returned by Stage.Submit when the stage is no longer accepting input — after Stage.CloseInput, fail-fast shutdown, or parent context cancellation. See Stage.Submit for race semantics.
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher[T any] struct { // contains filtered or unexported fields }
Batcher accumulates up to n Ok items from an upstream Result channel into batches, emitting each batch as rslt.Result[[]T]. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.
Created by NewBatcher. The zero value is not usable.
func NewBatcher ¶
NewBatcher creates a Batcher that reads from src, accumulates up to n Ok values per batch, and emits batches on Out(). Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.
The Batcher drains src to completion (source ownership rule), provided the consumer drains Batcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the Batcher blocks on output delivery and cannot drain src.
Panics if n <= 0, src is nil, or ctx is nil.
func (*Batcher[T]) Out ¶
Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.
Callers MUST drain Out to completion. If the consumer stops reading, the Batcher blocks and cannot drain its source, potentially causing upstream deadlocks.
func (*Batcher[T]) Stats ¶
func (b *Batcher[T]) Stats() BatcherStats
Stats returns approximate metrics. See BatcherStats for caveats. Stats are only reliable as final values after Batcher.Wait returns.
type BatcherStats ¶
type BatcherStats struct {
Received int64 // individual items consumed from src
Emitted int64 // individual Ok items included in emitted batches
Forwarded int64 // Err items forwarded downstream
Dropped int64 // items lost to shutdown/cancel (includes partial batch items)
BufferedDepth int64 // current items in partial batch accumulator
BatchCount int64 // number of batch results emitted
OutputBlockedTime time.Duration // cumulative time blocked sending to out
}
BatcherStats holds metrics for a Batcher.
Invariant (after Wait): Received = Emitted + Forwarded + Dropped.
type Join ¶ added in v0.45.0
type Join[R any] struct { // contains filtered or unexported fields }
Join is a strict branch recombination operator: it uses the first item from each of two source channels for join semantics (combine or error), then drains all remaining items from both sources (source ownership rule).
Join is designed for recombining Tee branches — each branch is expected to produce exactly one result. Missing items (source closes without producing) and extra items (source produces more than one) are contract violations handled gracefully: missing items produce MissingResultError, extra items are drained and counted in stats.
Created by NewJoin. The zero value is not usable. A Join must not be copied after first use.
func NewJoin ¶ added in v0.45.0
func NewJoin[A, B, R any]( ctx context.Context, srcA <-chan rslt.Result[A], srcB <-chan rslt.Result[B], fn func(A, B) R, ) *Join[R]
NewJoin creates a Join that uses the first item from each source for join semantics, then drains remaining items. Ok/Ok pairs are combined via fn; errors are forwarded. The result is emitted on Join.Out.
Error matrix:
- Ok(a), Ok(b) → Ok(fn(a, b))
- Ok(a), Err(e) → Err(e); a discarded
- Err(e), Ok(b) → Err(e); b discarded
- Err(ea), Err(eb) → Err(errors.Join(ea, eb))
- Ok(a), missing → Err(MissingResultError{Source: "B"}); a discarded
- missing, Ok(b) → Err(MissingResultError{Source: "A"}); b discarded
- Err(e), missing → Err(errors.Join(e, MissingResultError{Source: "B"}))
- missing, Err(e) → Err(errors.Join(MissingResultError{Source: "A"}, e))
- missing, missing → no output
Each source is drained to completion (source ownership rule). Extra items beyond the first are counted in [JoinStats.ExtraA] / [JoinStats.ExtraB]. Sources may close at different times.
fn must be a pure, synchronous combiner — it must not block, perform I/O, or depend on cancellation. fn runs on the Join's only goroutine; a blocking fn prevents cancellation observation, source draining, and Join.Wait from returning. Similarly, if the consumer stops reading Join.Out without canceling, the goroutine blocks on the output send with the same consequences. If combining can fail or block, use a downstream Pipe for the error-capable, context-aware transform. Panics in fn are recovered as rslt.PanicError.
Cancellation is best-effort and observed only in the Phase 1 select and the output send. On ctx cancellation, consumed items are discarded and both sources are drained. A pre-send checkpoint catches already-observable cancellation before attempting the output send, but a result may still be emitted if the send races with cancellation (both select cases ready). Join.Wait returns the latched context error if cancellation was observed during collection or output; Join.Wait may return nil even if the context was canceled, if the goroutine completed without observing it. Drain is unconditional — sources must close for the goroutine to exit.
Panics if srcA, srcB, ctx, or fn is nil.
func (*Join[R]) Out ¶ added in v0.45.0
Out returns the receive-only output channel. At most one result will appear on this channel.
The consumer MUST drain Out() to completion or cancel the shared context. If the consumer stops reading without canceling, the goroutine blocks on the output send.
Out() is idempotent — it always returns the same channel.
func (*Join[R]) Stats ¶ added in v0.45.0
Stats returns approximate metrics. See JoinStats for caveats. Stats are only reliable as final values after Join.Wait returns.
func (*Join[R]) Wait ¶ added in v0.45.0
Wait blocks until the goroutine exits and the output channel is closed. Returns a latched context error if cancellation was observed during collection or output. May return nil even if the context was canceled, if the goroutine completed without observing it (e.g., both sources closed before cancellation was checked).
Multiple Wait() calls are safe — subsequent calls return immediately with the same value.
type JoinStats ¶ added in v0.45.0
type JoinStats struct {
ReceivedA int64 // total items consumed from srcA
ReceivedB int64 // total items consumed from srcB
Combined int64 // successful fn(a,b) combinations (0 or 1)
Errors int64 // error results delivered to Out (0 or 1)
DiscardedA int64 // A items consumed but not part of a successful combination (error, missing, cancel, panic)
DiscardedB int64 // B items consumed but not part of a successful combination (error, missing, cancel, panic)
ExtraA int64 // A items beyond the first, drained after the join decision (contract violation)
ExtraB int64 // B items beyond the first, drained after the join decision (contract violation)
OutputBlockedTime time.Duration // time blocked sending result to out
}
JoinStats holds metrics for a Join.
Fields are read from independent atomics, so a mid-flight Stats value is NOT a consistent snapshot. Invariants are guaranteed only after Join.Wait returns.
Conservation invariant (after Wait):
- ReceivedA = Combined + DiscardedA + ExtraA
- ReceivedB = Combined + DiscardedB + ExtraB
- Combined + Errors <= 1
Counter precedence: DiscardedX counts only first items that were consumed but not successfully combined (error, cancel, panic, or other side missing). ExtraX counts items beyond the first, drained after the join decision is reached. Post-decision items are always classified as ExtraX, even if cancellation later prevents result delivery.
type Merge ¶ added in v0.45.0
type Merge[T any] struct { // contains filtered or unexported fields }
Merge is a nondeterministic interleaving fan-in from N sources.
One goroutine per source forwards items to a shared unbuffered output channel. Go runtime scheduler determines send order — no cross-source ordering guarantee, no fairness guarantee, no provenance tracking. Per-source order IS preserved: items from each individual source appear in the merged output in the same order they were received from that source (follows from one goroutine per source with sequential receive/send).
Merge is NOT the inverse of Tee. Tee broadcasts identical items to all branches. Merge interleaves distinct items from independent sources. Tee → ... → Merge does not restore original ordering, does not correlate outputs from sibling branches, and does not pair items across sources.
Created by NewMerge. The zero value is not usable. A Merge must not be copied after first use.
func NewMerge ¶ added in v0.45.0
NewMerge creates a Merge that reads from each source and forwards all items to a single unbuffered output channel. Items are interleaved nondeterministically — Go runtime scheduler determines send order.
Each source is drained to completion by its own goroutine (source ownership rule). Sources may close at different times — early closure of one source does not affect others. All sources must be finite and must eventually close (including on cancellation paths). If a source never closes, the corresponding goroutine blocks indefinitely and Merge.Wait hangs.
Cancellation is advisory, not a hard stop. On ctx cancellation, each source goroutine enters discard mode at its next cancellation checkpoint: stops forwarding but continues draining its source to completion. Two cancellation checkpoints per iteration: a non-blocking pre-send check and a blocking send-select. This bounds post-cancel forwarding to at most 1 item per source goroutine that has already passed the pre-send checkpoint. Cancel-aware sends ensure goroutines are not blocked on output when downstream stops reading. If a goroutine is blocked waiting on its source (for r := range src) when ctx cancels, it does not observe cancellation until the source produces an item or closes.
Merge.Wait returns only after all source goroutines exit — which requires all sources to close. Cancellation alone does not guarantee prompt return. After observing cancellation, each goroutine drains and discards remaining items from its source until that source closes.
Merge.Out is closed before done is closed, so Out() is guaranteed closed before Wait() returns. Callers can safely range Out() and then call Wait().
Goroutine lifecycle: constructor launches N source goroutines + 1 closer goroutine. Each source goroutine drains its source and sends to output. The closer goroutine waits on a WaitGroup for all source goroutines, closes output, and closes done.
Each source channel must be distinct and exclusively owned by the Merge. Passing the same channel twice creates two goroutines racing on one source — per-source ordering and stats become meaningless. The constructor does not check for duplicates.
Panics if len(sources) == 0, ctx is nil, or any source is nil.
func (*Merge[T]) Out ¶ added in v0.45.0
Out returns the receive-only output channel. All items from all sources appear on this single channel in nondeterministic order.
The consumer MUST drain Out() to completion or cancel the shared context. If the consumer stops reading without canceling, all source goroutines block on the shared output send and cannot drain their sources.
Out() is idempotent — it always returns the same channel.
func (*Merge[T]) Stats ¶ added in v0.45.0
func (m *Merge[T]) Stats() MergeStats
Stats returns approximate metrics. See MergeStats for caveats.
Per-source counters are loaded once into plain int64 slices, then aggregates are computed from those copied values. This guarantees single-call coherence: Received == sum(SourceReceived) within one Stats() return, even mid-flight.
Stats are only reliable as final values after Merge.Wait returns.
func (*Merge[T]) Wait ¶ added in v0.45.0
Wait blocks until all source goroutines exit and the output channel is closed. Returns a latched context error if any source goroutine entered a cancel path (pre-send checkpoint or send-select), nil otherwise.
Wait may return nil even if ctx was canceled. This happens when no goroutine observes cancellation on a checked path — e.g., all sources close before any goroutine loops back to the pre-send check, or a goroutine is blocked in range src when cancel fires and the source closes without sending. This is intentional: the operator completed its work, cancellation had no observable effect on forwarding, and reporting it would be a false positive.
Multiple Wait() calls are safe — subsequent calls return immediately with the same value.
type MergeStats ¶ added in v0.45.0
type MergeStats struct {
Received int64 // total items consumed from all sources
Forwarded int64 // items sent to output
Dropped int64 // items discarded during cancel
// Per-source stats. Len == N (number of sources).
// Index i corresponds to sources[i] as passed to NewMerge.
SourceReceived []int64
SourceForwarded []int64
SourceDropped []int64
}
MergeStats holds metrics for a Merge.
Fields are derived from per-source atomic counters, so a mid-flight Stats value is NOT a consistent snapshot — cross-metric invariants (e.g., Received == Forwarded + Dropped) may not hold because an in-flight item has been counted as received but not yet as forwarded or dropped. Invariants are guaranteed only after Merge.Wait returns.
Per-metric aggregates are coherent within a single Stats() call even mid-flight: Received == sum(SourceReceived), Forwarded == sum(SourceForwarded), Dropped == sum(SourceDropped). This holds because aggregates are computed from the same copied values.
Invariant (after Wait): Received = Forwarded + Dropped. Per-source invariant (after Wait): SourceReceived[i] = SourceForwarded[i] + SourceDropped[i].
SourceReceived[i] corresponds to sources[i] as passed to NewMerge. The index mapping is stable and matches construction order.
type MissingResultError ¶ added in v0.45.0
type MissingResultError struct {
Source string // "A" or "B"
}
MissingResultError indicates a source closed without producing a result. Callers can use errors.As to extract the Source field and determine which side was missing.
func (*MissingResultError) Error ¶ added in v0.45.0
func (e *MissingResultError) Error() string
type Options ¶
type Options[T any] struct { // Capacity is the number of items the input buffer can hold. // Submit blocks when the buffer is full (the "rope"). // Zero means unbuffered: Submit blocks until a worker dequeues. // Negative values panic. Capacity int // Weight returns the cost of item t for stats tracking only // ([Stats.InFlightWeight]). Does not affect admission — capacity // is count-based. Called on the Submit path, so must be cheap. // Must be pure, non-negative, and safe for concurrent calls. // If nil, every item costs 1. Weight func(T) int64 // Workers is the number of concurrent fn invocations. // Zero means default: 1 (serial constraint — the common case). // Negative values panic. Workers int // ContinueOnError, when true, keeps processing after fn errors // instead of cancelling the stage. Default: false (fail-fast). ContinueOnError bool // TrackAllocations, when true, samples process-wide heap allocation // counters (runtime/metrics /gc/heap/allocs:bytes and :objects) before // and after each fn invocation and accumulates the deltas into // [Stats.ObservedAllocBytes] and [Stats.ObservedAllocObjects]. // // Scope: each sample captures the invocation window of a single fn // call. Counters are process-global — they include allocations by any // goroutine during that window, not just the stage's own work. // // Concurrent over-attribution: with Workers > 1, overlapping // invocation windows can each capture the same unrelated allocation, // so per-stage totals can exceed actual process allocations. Totals // are also not additive across stages for the same reason. // // Overhead: on the order of 1µs per item in single-worker throughput // benchmarks (two runtime/metrics.Read calls plus counter extraction // and atomic accumulation). Negligible when fn does real work; // roughly doubles overhead for no-op or sub-microsecond fns. // Multi-worker contention on shared atomic counters may add cost. // // Default: false (disabled). Enable when diagnosing allocation-heavy // stages. Silently disabled if the runtime does not support the // required metrics (validated on first use via sync.Once). TrackAllocations bool }
Options configures a Stage.
Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). Capacity is always an item-count bound; [Options.Weight] affects stats only, not admission.
type Stage ¶
type Stage[T, R any] struct { // contains filtered or unexported fields }
Stage is a running constrained stage. Created by Start. The zero value is not usable.
func Pipe ¶
func Pipe[T, R any]( ctx context.Context, src <-chan rslt.Result[T], fn func(context.Context, T) (R, error), opts Options[T], ) *Stage[T, R]
Pipe creates a stage that reads from an upstream Result channel, forwarding Ok values to fn via workers and passing Err values directly to the output (error passthrough). The feeder goroutine drains src to completion, provided the consumer drains Stage.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src.
Error passthrough is best-effort during shutdown: if ctx is canceled or fail-fast fires, upstream Err values may be dropped instead of forwarded (reflected in [Stats.Dropped]). During normal operation, all upstream errors are forwarded.
The returned stage's input side is owned by the feeder — do not call Stage.Submit or Stage.CloseInput directly. Both are handled gracefully (no panic, no deadlock) but are misuse. External Submit calls void the stats invariant (Received will not account for externally submitted items).
Stats: [Stats.Received] = [Stats.Submitted] + [Stats.Forwarded] + [Stats.Dropped]. Forwarded errors do not trigger fail-fast and do not affect Stage.Wait.
Panics if ctx is nil, src is nil, or fn is nil.
func Start ¶
func Start[T, R any]( ctx context.Context, fn func(context.Context, T) (R, error), opts Options[T], ) *Stage[T, R]
Start launches a constrained stage that processes items through fn.
The stage starts a cancel watcher goroutine that calls Stage.CloseInput when the context is canceled (either parent cancel or fail-fast). This ensures workers always eventually exit.
Panics if ctx is nil, fn is nil, Capacity is negative, or Workers is negative.
func (*Stage[T, R]) Cause ¶
Cause returns the latched terminal cause of the stage, blocking until shutdown completes. Like Stage.Wait, Cause does not initiate shutdown. Unlike Stage.Wait, Cause distinguishes all three outcomes:
- nil: all items completed successfully (or ContinueOnError with no cancel)
- fail-fast error: the first fn error that caused shutdown
- parent cancel cause: context.Cause of the parent context at completion
The terminal cause is latched when the last worker finishes, so Cause is stable and idempotent — it returns the same value regardless of later parent context changes. If parent cancellation races with a worker error, Cause may return either depending on observation order.
A parent cancellation that occurs after fn returns but before the worker completes result handoff is reported as stage cancellation, even though all business-level computations succeeded. Use Stage.Wait if only fail-fast errors matter.
Requires Out to be drained (see Stage.DiscardAndCause when individual results are not needed).
func (*Stage[T, R]) CloseInput ¶
func (s *Stage[T, R]) CloseInput()
CloseInput signals that no more items will be submitted. Workers finish processing buffered items, then shut down.
Blocks briefly until all in-flight Stage.Submit calls exit, then closes the input channel.
Idempotent — safe to call multiple times (use defer as safety net). Also called internally on fail-fast or parent context cancellation.
func (*Stage[T, R]) DiscardAndCause ¶
DiscardAndCause drains all remaining results from Stage.Out and returns Stage.Cause's latched terminal cause. Use when individual results are not needed but composable terminal status is required.
Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndCause with direct Out consumption causes a consumption race (results go to the wrong reader).
func (*Stage[T, R]) DiscardAndWait ¶
DiscardAndWait drains all remaining results from Stage.Out and returns Stage.Wait's error. Use when individual results are not needed.
Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndWait with direct Out consumption causes a consumption race (results go to the wrong reader).
func (*Stage[T, R]) Out ¶
Out returns the receive-only output channel. It closes after all workers finish and all results have been sent.
Cardinality: every successful Stage.Submit produces exactly one result.
Ordering: with Workers == 1, results are delivered in submit order. With Workers > 1, result order is nondeterministic.
Callers MUST drain Out to completion (or use Stage.DiscardAndWait / Stage.DiscardAndCause):
for result := range stage.Out() {
val, err := result.Unpack()
// handle result — do NOT break out of this loop
}
After cancellation or fail-fast, Out may still emit: success results from work already in fn, ordinary error results from in-flight work, and canceled results for buffered items drained post-cancel. With Workers > 1, the fail-fast triggering error is not guaranteed to appear before cancellation results; use Stage.Wait or Stage.Cause for stage-level terminal status, not stream order.
If the consumer stops reading, workers block sending results, which prevents shutdown and causes Stage.Wait to hang — leaking goroutines, context resources, and the stage itself. This is the same contract as consuming from any Go channel-based pipeline.
func (*Stage[T, R]) Stats ¶
Stats returns approximate metrics. See Stats for caveats. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.
func (*Stage[T, R]) Submit ¶
Submit sends item into the stage for processing. Blocks when the buffer is full (backpressure / "rope"). Returns ErrClosed after Stage.CloseInput, fail-fast shutdown, or parent context cancellation.
If cancellation or CloseInput has already occurred before Submit is called, Submit deterministically returns ErrClosed without blocking. A Submit that is blocked or entering concurrently when shutdown fires may nondeterministically succeed or return an error, per Go select semantics — even a blocked Submit can succeed if capacity becomes available at the same instant. Items admitted during this window are processed normally (or canceled if the stage context is already done).
The ctx parameter controls only admission blocking — it is NOT passed to fn. The stage's own context (derived from the ctx passed to Start) is what fn receives. This means canceling a submitter's context does not cancel the item's processing once admitted.
Panics if ctx is nil (same as context.Context method calls). Panics if Weight returns a negative value. Note: a panic in Weight propagates to the caller (unlike fn panics, which are recovered and wrapped in rslt.PanicError). Safe for concurrent use from multiple goroutines. Safe to call concurrently with Stage.CloseInput (will not panic).
func (*Stage[T, R]) Wait ¶
Wait blocks until all workers have finished and Out is closed. Wait does not initiate shutdown — call Stage.CloseInput first. Without CloseInput, Wait blocks forever if no more items are submitted. Requires that Out is drained concurrently (see Stage.Out). The stage is complete when all workers have finished, terminal status is latched, and the done channel closes.
Returns the first observed fail-fast error, or nil. Specifically:
- In fail-fast mode: returns the first fn error that caused shutdown (nondeterministic among concurrent workers — first to acquire lock)
- In ContinueOnError mode: always returns nil (check individual results)
- On parent context cancellation: returns nil (caller already knows; errors returned by fn after parent cancel are not stored)
If parent cancellation races with a worker error, Wait may return either nil or the error depending on observation order. Use Stage.Cause for terminal status that distinguishes all three outcomes.
type Stats ¶
type Stats struct {
Submitted int64 // items accepted by Submit (successful return)
Completed int64 // items where fn returned (includes Failed and Panicked)
Failed int64 // subset of Completed where result is error (includes Panicked)
Panicked int64 // subset of Failed where fn panicked
Canceled int64 // items dequeued but not passed to fn because cancellation was observed first (not in Completed)
// Pipe-specific counters. Zero for Start-created stages.
// Invariant (after Wait): Received = Submitted + Forwarded + Dropped.
Received int64 // items consumed from src by feeder (any Result)
Forwarded int64 // upstream Err items sent directly to out (bypassed fn)
Dropped int64 // items seen but neither submitted nor forwarded (shutdown/cancel)
ServiceTime time.Duration // cumulative time fn was executing
IdleTime time.Duration // cumulative worker time waiting for input (includes startup and tail wait)
OutputBlockedTime time.Duration // cumulative worker time blocked handing result to consumer (unbuffered out channel)
BufferedDepth int64 // approximate items in queue; may transiently be negative mid-flight; 0 when Capacity is 0 (unbuffered)
InFlightWeight int64 // weighted cost of items currently in fn (stats-only, not admission)
QueueCapacity int // configured capacity
// AllocTrackingActive reports whether allocation sampling is
// effectively enabled for this stage. False when
// [Options.TrackAllocations] was not set, or when the runtime does
// not support the required metrics. Allows callers to distinguish
// "tracking requested but unsupported" from "tracking not requested"
// or "tracking active but fn allocated zero."
AllocTrackingActive bool
// ObservedAllocBytes and ObservedAllocObjects are cumulative heap
// allocation counters sampled via runtime/metrics around each fn
// invocation. Zero when AllocTrackingActive is false.
//
// Process-global, not stage-exclusive: includes allocations by any
// goroutine during each fn invocation window. With Workers > 1,
// overlapping windows can capture the same unrelated allocation in
// multiple workers, so per-stage totals can exceed actual process
// allocations over the same period. Not additive across stages.
// Biased upward by longer service times (more background noise).
// Best used as a directional signal under stable workload where the
// stage dominates allocations, not for precise attribution. For
// exact allocation profiling, use go tool pprof.
ObservedAllocBytes uint64
ObservedAllocObjects uint64
}
Stats holds approximate metrics for a Stage.
Fields are read from independent atomics, so a single Stats value is NOT a consistent snapshot — individual fields may reflect slightly different moments. Relationships like Submitted >= Completed + Canceled are not guaranteed mid-flight. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.
All durations are cumulative across all workers since Start. With Workers > 1, durations can exceed wall-clock time.
type Tee ¶ added in v0.43.0
type Tee[T any] struct { // contains filtered or unexported fields }
Tee is a synchronous lockstep broadcast from one source to N branches.
Created by NewTee. The zero value is not usable. A Tee must not be copied after first use.
func NewTee ¶ added in v0.43.0
NewTee creates a Tee that reads from src and broadcasts each item to n unbuffered output branches. Items are sent to branches sequentially in index order — branch 0 first, then branch 1, etc. The slowest consumer governs pace (synchronous lockstep).
The Tee drains src to completion (source ownership rule), provided all branch consumers drain their branches or ctx is canceled, and src eventually closes. Cancellation unblocks branch sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items. Branch sends are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If a branch consumer stops reading and ctx is never canceled, the Tee blocks on that branch's send and stalls all branches.
Tee does not clone payloads. Reference-containing payloads (pointers, slices, maps) may alias across branches. Consumers must treat received values as immutable; mutation after receipt is a data race.
Panics if n <= 0, src is nil, or ctx is nil.
func (*Tee[T]) Branch ¶ added in v0.43.0
Branch returns the receive-only output channel for branch i.
Callers MUST drain all branches to completion or cancel the shared context. An undrained branch blocks the Tee and stalls all branches.
Panics if i is out of range [0, n).
type TeeStats ¶ added in v0.43.0
type TeeStats struct {
Received int64 // items consumed from src
FullyDelivered int64 // items sent to ALL branches
PartiallyDelivered int64 // items sent to some branches before cancel (≥1, <N)
Undelivered int64 // items not sent to any branch (cancel before first send, or discard mode)
// Per-branch stats. Len == N (number of branches).
// BranchDelivered[i] = items successfully sent to branch i.
// BranchBlockedTime[i] = cumulative time blocked sending to branch i.
BranchDelivered []int64
BranchBlockedTime []time.Duration
}
TeeStats holds metrics for a Tee.
Fields are read from independent atomics, so a mid-flight Stats value is NOT a consistent snapshot — individual fields may reflect different moments. Invariants are guaranteed only after Tee.Wait returns.
Invariant (after Wait): Received = FullyDelivered + PartiallyDelivered + Undelivered.
PartiallyDelivered is at most 1 per Tee lifetime: once cancellation interrupts delivery mid-item, the goroutine enters discard mode and does not attempt delivery on subsequent items.
BranchBlockedTime[i] measures direct send-wait time on branch i. It does not measure end-to-end latency imposed by other branches. Because branches are sent in index order, earlier branches' blocked time reflects their consumer's speed directly; later branches' blocked time is near zero even if they are throttled by earlier branches.
type WeightedBatcher ¶
type WeightedBatcher[T any] struct { // contains filtered or unexported fields }
WeightedBatcher accumulates Ok items from an upstream Result channel into batches, flushing when accumulated weight OR item count reaches the threshold. Each item's weight is determined by weightFn. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.
Created by NewWeightedBatcher. The zero value is not usable.
func NewWeightedBatcher ¶
func NewWeightedBatcher[T any]( ctx context.Context, src <-chan rslt.Result[T], threshold int, weightFn func(T) int, ) *WeightedBatcher[T]
NewWeightedBatcher creates a WeightedBatcher that reads from src, accumulates Ok values, and emits batches on Out(). A batch is flushed when either accumulated weight (per weightFn) reaches threshold or item count reaches threshold — whichever comes first. The item-count fallback prevents unbounded accumulation of zero/low-weight items. Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.
weightFn must return a non-negative weight for each item. A panic occurs if weightFn returns a negative value.
The WeightedBatcher drains src to completion (source ownership rule), provided the consumer drains WeightedBatcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the WeightedBatcher blocks on output delivery and cannot drain src.
Panics if threshold <= 0, weightFn is nil, src is nil, or ctx is nil.
func (*WeightedBatcher[T]) Out ¶
func (b *WeightedBatcher[T]) Out() <-chan rslt.Result[[]T]
Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.
Callers MUST drain Out to completion. If the consumer stops reading, the WeightedBatcher blocks and cannot drain its source, potentially causing upstream deadlocks.
func (*WeightedBatcher[T]) Stats ¶
func (b *WeightedBatcher[T]) Stats() WeightedBatcherStats
Stats returns approximate metrics. See WeightedBatcherStats for caveats. Stats are only reliable as final values after WeightedBatcher.Wait returns.
func (*WeightedBatcher[T]) Wait ¶
func (b *WeightedBatcher[T]) Wait() error
Wait blocks until the WeightedBatcher goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted output send), nil otherwise. The WeightedBatcher has no fn, so there is no fail-fast error.
type WeightedBatcherStats ¶
type WeightedBatcherStats struct {
Received int64 // individual items consumed from src
Emitted int64 // individual Ok items included in emitted batches
Forwarded int64 // Err items forwarded downstream
Dropped int64 // items lost to shutdown/cancel (includes partial batch items)
BufferedDepth int64 // current items in partial batch accumulator
BufferedWeight int64 // current accumulated weight in partial batch
BatchCount int64 // number of batch results emitted
OutputBlockedTime time.Duration // cumulative time blocked sending to out
}
WeightedBatcherStats holds metrics for a WeightedBatcher.
Invariant (after Wait): Received = Emitted + Forwarded + Dropped.