toc

package
v0.42.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 7 Imported by: 0

README

toc

Constrained stage runner inspired by Drum-Buffer-Rope (Theory of Constraints). Process items through a known bottleneck with bounded concurrency, backpressure, and constraint-centric stats.

stage := toc.Start(ctx, processChunk, toc.Options[Chunk]{Capacity: 10})

go func() {
    defer stage.CloseInput() // submitter owns closing input

    for _, chunk := range chunks {
        if err := stage.Submit(ctx, chunk); err != nil {
            break
        }
    }
}()

for result := range stage.Out() {
    val, err := result.Unpack()
    // handle result
}

err := stage.Wait()

DBR Background

If you already know DBR, skip to What It Adds.

In Goldratt's The Goal, a scout troop hike illustrates the constraint problem: the slowest hiker (Herbie) determines throughput for the whole group. Steps before the constraint can produce work faster than it can consume, so without limits the gap grows unboundedly.

Drum-Buffer-Rope (DBR) is the operational policy derived from this insight: the constraint's pace is the drum that sets the system's rhythm, a protective queue (the buffer) sits in front of the constraint so upstream stalls don't starve it, and a WIP limit (the rope) prevents upstream from outrunning the constraint.

DBR-inspired analogues in toc (approximate software analogues, not a literal factory-floor DBR implementation):

DBR Concept toc Analogue
Constraint (bottleneck) The stage's processing capacity — fn execution bounded by Workers
Drum (constraint's pace) The stage's processing pace, primarily shaped by fn and Workers (actual throughput also depends on downstream consumption)
Buffer (protective queue) Capacity — bounded input queue in front of the constrained step
Rope (WIP limit) Bounded admission to the stage — Submit blocks when total WIP (Capacity + Workers) is saturated
Constraint monitoring Stats — ServiceTime, IdleTime, OutputBlockedTime indicate constraint utilization and downstream pressure

The hiking analogy is from Goldratt, Eliyahu M. The Goal. North River Press, 1984. DBR applied to software in Tendon, Steve and Wolfram Müller. Hyper-Productive Knowledge Work Performance, Ch 18. J. Ross Publishing, 2015.

What It Adds Over Raw Channels

  • Bounded admission — Submit blocks when the buffer is full (the "rope")
  • Lifecycle contract — Submit → CloseInput → drain Out → Wait
  • Fail-fast default — first error cancels remaining work
  • Constraint stats — service time, idle time, output-blocked time, queue depth
  • Panic recovery — panics in fn become rslt.PanicError results with stack traces

Key Concepts

Capacity is the input buffer size. Zero means unbuffered (Submit blocks until a worker dequeues). Submit blocks when full — this is the backpressure mechanism.

Workers is the number of concurrent fn invocations. Default 1 (serial constraint — the common case).

Submit's ctx is admission-only — it controls how long Submit blocks, not what context fn receives. fn always gets the stage context (derived from Start's ctx).

Output must be drained. Workers block on the unbuffered output channel if nobody reads. Always drain Out() or use DiscardAndWait().

Stats

stats := stage.Stats()
fmt.Printf("utilization: %v service / %v total\n",
    stats.ServiceTime,
    stats.ServiceTime + stats.IdleTime + stats.OutputBlockedTime)

Stats are approximate mid-flight (independent atomics, not a snapshot). Reliable as final values after Wait returns.

Pipeline Composition

Pipe and NewBatcher compose stages into multi-stage pipelines with per-stage observability, error passthrough, and backpressure.

chunker  := toc.Start(ctx, chunkFile, Options{Workers: N, Capacity: N*2})
batched  := toc.NewBatcher(ctx, chunker.Out(), 64)
embedder := toc.Pipe(ctx, batched.Out(), embedBatch, Options{Workers: E})
storer   := toc.Pipe(ctx, embedder.Out(), storeBatch, Options{Workers: 1})

// feed the head stage
go func() {
    defer chunker.CloseInput()
    for _, file := range files {
        if err := chunker.Submit(ctx, file); err != nil {
            break
        }
    }
}()

// drain the tail
for r := range storer.Out() { ... }

// wait — reverse order recommended
storer.Wait(); embedder.Wait(); batched.Wait(); chunker.Wait()
Two Error Planes

Pipelines have two distinct error systems:

  1. Data-plane errorsrslt.Err[R] values in Out(). Per-item results. Pipeline continues processing other items. Forwarded upstream errors are always data-plane.

  2. Control-plane errors — stage execution failure via Wait() / Cause(). Terminal: "the stage itself failed." In fail-fast mode, the first fn error becomes control-plane.

Wait() returning nil does NOT mean all items succeeded — it means the stage didn't terminally fail. Check individual Out() results for item-level errors.

Pipe

Pipe creates a stage from an upstream <-chan rslt.Result[T]. Ok values go to workers; Err values pass through directly to the output (error passthrough). The feeder goroutine drains the source to completion (see Lifecycle Contract for preconditions).

The returned stage's input side is owned by the feeder — do not call Submit or CloseInput (both handled gracefully, but are misuse). External Submit calls void the stats invariant (Received will not account for externally submitted items).

Pipe stats: Received = Submitted + Forwarded + Dropped.

Batcher

NewBatcher accumulates up to n Ok items into []T batches. Errors act as batch boundaries: flush partial batch, forward error, start fresh. Each emitted batch is a fresh allocation (no aliasing).

Batcher stats: Received = Emitted + Forwarded + Dropped.

Batcher introduces up to n-1 items of hidden buffering. Downstream capacity counts batches, not original items.

WeightedBatcher

NewWeightedBatcher flushes when accumulated weight OR item count reaches threshold — whichever comes first. Each Ok item's weight is determined by weightFn func(T) int. The item-count fallback prevents unbounded accumulation of zero/low-weight items. weightFn must return non-negative values (negative panics).

Useful when items have variable cost (e.g., files with different text counts — batch until total texts >= 64, but also cap at 64 files regardless of weight).

WeightedBatcher stats: same as Batcher plus BufferedWeight (accumulated weight in partial batch). Invariant: Received = Emitted + Forwarded + Dropped.

Lifecycle Contract

Source ownership: Pipe, Batcher, and WeightedBatcher drain their source to completion. This requires two conditions: (1) the consumer drains Out() or ctx is canceled (downstream liveness), and (2) the upstream source eventually closes (upstream completion). Cancellation solves downstream liveness — it unblocks output sends so the operator can continue draining. It does not force-close the source. If the source never closes, the operator blocks in drain/discard mode indefinitely. After cancellation, all switch to discard mode (continue reading source, discard items). If the consumer stops reading and ctx is never canceled, the operator blocks on output delivery and cannot drain its source.

Cancellation: Fail-fast is stage-local — it cancels only the stage, not upstream. For pipeline-wide shutdown, cancel the shared parent context. This favors deterministic draining over aggressive abort.

Best-effort passthrough: Error passthrough and batch emission use cancel-aware sends (select on ctx). During shutdown, a send may race with cancellation — either branch may win. This means: (1) output may still appear on Out() after cancellation if the send case wins, and (2) upstream errors may be dropped instead of forwarded if the cancel case wins. All drops are reflected in stats. During normal operation, all items are delivered.

Drain order: Drain only the tail stage's Out(). Each Pipe/Batcher drains its upstream internally. After tail Out() closes, Wait() may be called in any order. Reverse order is recommended.

Ordering: No ordering guarantee with Workers > 1. With Workers == 1, worker results preserve encounter order. However, forwarded errors bypass the worker queue, so in Pipe stages they may arrive before buffered worker results regardless of worker count.

When to Use Pipe vs hof.PipeErr

Use hof.PipeErr when transforms are cheap, one worker pool is enough, and per-step observability is unnecessary.

Use toc.Pipe when steps have different throughput/latency profiles, independent worker counts are needed, per-stage capacity matters, or you need to identify the bottleneck.

Documentation

Overview

Package toc provides a constrained stage runner inspired by Drum-Buffer-Rope (Theory of Constraints), with pipeline composition via Pipe and NewBatcher.

A Stage owns a bounded input queue and one or more workers with bounded concurrency. Producers submit items via Stage.Submit; the stage processes them through fn and emits results on Stage.Out. When the stage is saturated, Submit blocks — this is the "rope" limiting upstream WIP.

The stage tracks constraint utilization, idle time, and output-blocked time via Stage.Stats, helping operators assess whether this stage is acting as the constraint and whether downstream backpressure is suppressing throughput.

Single-Stage Lifecycle

  1. Start a stage with Start
  2. Submit items with Stage.Submit from one or more goroutines
  3. Call Stage.CloseInput when all submissions are done (see below)
  4. Read results from Stage.Out until closed — must drain to completion
  5. Call Stage.Wait (or Stage.Cause) to block until shutdown completes Or combine steps 4-5: Stage.DiscardAndWait / Stage.DiscardAndCause

The goroutine or coordinator that knows no more submissions will occur owns CloseInput. In single-producer code, deferring CloseInput in the submitting goroutine is a good safety net. With multiple producers, a coordinator should call CloseInput after all producers finish (e.g., after a sync.WaitGroup). CloseInput is also called internally on fail-fast error or parent context cancellation, so the input side closes automatically on abnormal paths.

Cardinality: under the liveness conditions below, every Stage.Submit that returns nil yields exactly one rslt.Result on Stage.Out. Submit calls that return an error produce no result.

Operational notes: callers must drain Stage.Out until closed, or use Stage.DiscardAndWait / Stage.DiscardAndCause. If callers stop draining Out, workers block on result delivery and Stage.Wait / Stage.Cause may never return. If fn blocks forever or ignores cancellation, the stage leaks goroutines and never completes. See Stage.Out for full liveness details. Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). See Stage.Cause for terminal-status semantics (Wait vs Cause). See Stage.Wait for completion semantics.

Pipeline Composition

Pipe composes stages by reading from an upstream Result channel, forwarding Ok values to workers and passing Err values directly to the output (error passthrough). NewBatcher accumulates items into fixed-count batches between stages. NewWeightedBatcher accumulates items into weight-based batches (flush when accumulated weight reaches threshold).

Pipelines have two error planes: data-plane errors (per-item rslt.Err in Stage.Out) and control-plane errors (Stage.Wait / Stage.Cause). Forwarded upstream errors are data-plane only — they never trigger fail-fast in the downstream stage.

See the package README for pipeline lifecycle contract, cancellation topology, and selection rubric (hof.PipeErr vs toc.Pipe).

This package is for pipelines with a known bottleneck stage. If you don't know your constraint, profile first.

Example
package main

import (
	"context"
	"fmt"

	"github.com/binaryphile/fluentfp/toc"
)

func main() {
	// double concatenates a string with itself.
	double := func(_ context.Context, s string) (string, error) {
		return s + s, nil
	}

	ctx := context.Background()

	// Start's ctx governs stage lifetime; Submit's ctx bounds only admission.
	stage := toc.Start(ctx, double, toc.Options[string]{Capacity: 3, Workers: 1})

	go func() {
		defer stage.CloseInput()

		for _, item := range []string{"a", "b", "c"} {
			if err := stage.Submit(ctx, item); err != nil {
				break
			}
		}
	}()

	for result := range stage.Out() {
		val, err := result.Unpack()
		if err != nil {
			fmt.Println("error:", err)
			continue
		}
		fmt.Println(val)
	}

	if err := stage.Wait(); err != nil {
		fmt.Println("stage error:", err)
	}

}
Output:

aa
bb
cc
Example (Pipe)

Example_pipe demonstrates basic error passthrough through a Pipe stage.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/binaryphile/fluentfp/rslt"
	"github.com/binaryphile/fluentfp/toc"
)

func main() {
	ctx := context.Background()

	src := make(chan rslt.Result[int], 3)
	src <- rslt.Ok(10)
	src <- rslt.Err[int](errors.New("oops"))
	src <- rslt.Ok(20)
	close(src)

	// doubleFn doubles the input.
	doubleFn := func(_ context.Context, n int) (int, error) {
		return n * 2, nil
	}

	stage := toc.Pipe(ctx, src, doubleFn, toc.Options[int]{})

	for r := range stage.Out() {
		if v, err := r.Unpack(); err != nil {
			fmt.Println("error:", err)
		} else {
			fmt.Println(v)
		}
	}

	stage.Wait()

	// Forwarded errors bypass the worker queue, so the error
	// may arrive before queued Ok results complete.

}
Output:

error: oops
20
40
Example (Pipeline)

Example_pipeline demonstrates a four-handle pipeline modeled on the era-indexer: Start → Batcher → Pipe → Pipe, with error passthrough and reverse-order Wait.

package main

import (
	"context"
	"errors"
	"fmt"

	"github.com/binaryphile/fluentfp/toc"
)

func main() {
	ctx := context.Background()

	// Stage 1: "chunk" each number into a string representation.
	chunkFn := func(_ context.Context, n int) (string, error) {
		if n == 3 {
			return "", errors.New("bad input: 3")
		}

		return fmt.Sprintf("chunk(%d)", n), nil
	}
	chunker := toc.Start(ctx, chunkFn, toc.Options[int]{Capacity: 5, ContinueOnError: true})

	// Stage 2: Batch strings into groups of 2.
	batched := toc.NewBatcher(ctx, chunker.Out(), 2)

	// Stage 3: "embed" each batch by joining.
	embedFn := func(_ context.Context, batch []string) (string, error) {
		result := ""
		for i, s := range batch {
			if i > 0 {
				result += "+"
			}
			result += s
		}

		return fmt.Sprintf("embed[%s]", result), nil
	}
	embedder := toc.Pipe(ctx, batched.Out(), embedFn, toc.Options[[]string]{})

	// Stage 4: "store" by uppercasing (identity for this example).
	storeFn := func(_ context.Context, s string) (string, error) {
		return fmt.Sprintf("store(%s)", s), nil
	}
	storer := toc.Pipe(ctx, embedder.Out(), storeFn, toc.Options[string]{})

	// Feed the head stage.
	go func() {
		defer chunker.CloseInput()
		for _, n := range []int{1, 2, 3, 4, 5} {
			if err := chunker.Submit(ctx, n); err != nil {
				break
			}
		}
	}()

	// Drain the tail.
	for r := range storer.Out() {
		if v, err := r.Unpack(); err != nil {
			fmt.Println("error:", err)
		} else {
			fmt.Println(v)
		}
	}

	// Wait in reverse order (recommended).
	storer.Wait()
	embedder.Wait()
	batched.Wait()
	chunker.Wait()

	// Forwarded errors bypass worker queues, so the error from item 3
	// may arrive before the batch containing items 1-2 is processed.

}
Output:

error: bad input: 3
store(embed[chunk(1)+chunk(2)])
store(embed[chunk(4)+chunk(5)])

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("toc: stage closed")

ErrClosed is returned by Stage.Submit when the stage is no longer accepting input — after Stage.CloseInput, fail-fast shutdown, or parent context cancellation. See Stage.Submit for race semantics.

Functions

This section is empty.

Types

type Batcher

type Batcher[T any] struct {
	// contains filtered or unexported fields
}

Batcher accumulates up to n Ok items from an upstream Result channel into batches, emitting each batch as rslt.Result[[]T]. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.

Created by NewBatcher. The zero value is not usable.

func NewBatcher

func NewBatcher[T any](
	ctx context.Context,
	src <-chan rslt.Result[T],
	n int,
) *Batcher[T]

NewBatcher creates a Batcher that reads from src, accumulates up to n Ok values per batch, and emits batches on Out(). Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.

The Batcher drains src to completion (source ownership rule), provided the consumer drains Batcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the Batcher blocks on output delivery and cannot drain src.

Panics if n <= 0, src is nil, or ctx is nil.

func (*Batcher[T]) Out

func (b *Batcher[T]) Out() <-chan rslt.Result[[]T]

Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.

Callers MUST drain Out to completion. If the consumer stops reading, the Batcher blocks and cannot drain its source, potentially causing upstream deadlocks.

func (*Batcher[T]) Stats

func (b *Batcher[T]) Stats() BatcherStats

Stats returns approximate metrics. See BatcherStats for caveats. Stats are only reliable as final values after Batcher.Wait returns.

func (*Batcher[T]) Wait

func (b *Batcher[T]) Wait() error

Wait blocks until the Batcher goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted output send), nil otherwise. The Batcher has no fn, so there is no fail-fast error.

type BatcherStats

type BatcherStats struct {
	Received          int64         // individual items consumed from src
	Emitted           int64         // individual Ok items included in emitted batches
	Forwarded         int64         // Err items forwarded downstream
	Dropped           int64         // items lost to shutdown/cancel (includes partial batch items)
	BufferedDepth     int64         // current items in partial batch accumulator
	BatchCount        int64         // number of batch results emitted
	OutputBlockedTime time.Duration // cumulative time blocked sending to out
}

BatcherStats holds metrics for a Batcher.

Invariant (after Wait): Received = Emitted + Forwarded + Dropped.

type Options

type Options[T any] struct {
	// Capacity is the number of items the input buffer can hold.
	// Submit blocks when the buffer is full (the "rope").
	// Zero means unbuffered: Submit blocks until a worker dequeues.
	// Negative values panic.
	Capacity int

	// Weight returns the cost of item t for stats tracking only
	// ([Stats.InFlightWeight]). Does not affect admission — capacity
	// is count-based. Called on the Submit path, so must be cheap.
	// Must be pure, non-negative, and safe for concurrent calls.
	// If nil, every item costs 1.
	Weight func(T) int64

	// Workers is the number of concurrent fn invocations.
	// Zero means default: 1 (serial constraint — the common case).
	// Negative values panic.
	Workers int

	// ContinueOnError, when true, keeps processing after fn errors
	// instead of cancelling the stage. Default: false (fail-fast).
	ContinueOnError bool
}

Options configures a Stage.

Total stage WIP (item count) is up to Capacity (buffered) + Workers (in-flight). Capacity is always an item-count bound; [Options.Weight] affects stats only, not admission.

type Stage

type Stage[T, R any] struct {
	// contains filtered or unexported fields
}

Stage is a running constrained stage. Created by Start. The zero value is not usable.

func Pipe

func Pipe[T, R any](
	ctx context.Context,
	src <-chan rslt.Result[T],
	fn func(context.Context, T) (R, error),
	opts Options[T],
) *Stage[T, R]

Pipe creates a stage that reads from an upstream Result channel, forwarding Ok values to fn via workers and passing Err values directly to the output (error passthrough). The feeder goroutine drains src to completion, provided the consumer drains Stage.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src.

Error passthrough is best-effort during shutdown: if ctx is canceled or fail-fast fires, upstream Err values may be dropped instead of forwarded (reflected in [Stats.Dropped]). During normal operation, all upstream errors are forwarded.

The returned stage's input side is owned by the feeder — do not call Stage.Submit or Stage.CloseInput directly. Both are handled gracefully (no panic, no deadlock) but are misuse. External Submit calls void the stats invariant (Received will not account for externally submitted items).

Stats: [Stats.Received] = [Stats.Submitted] + [Stats.Forwarded] + [Stats.Dropped]. Forwarded errors do not trigger fail-fast and do not affect Stage.Wait.

Panics if ctx is nil, src is nil, or fn is nil.

func Start

func Start[T, R any](
	ctx context.Context,
	fn func(context.Context, T) (R, error),
	opts Options[T],
) *Stage[T, R]

Start launches a constrained stage that processes items through fn.

The stage starts a cancel watcher goroutine that calls Stage.CloseInput when the context is canceled (either parent cancel or fail-fast). This ensures workers always eventually exit.

Panics if ctx is nil, fn is nil, Capacity is negative, or Workers is negative.

func (*Stage[T, R]) Cause

func (s *Stage[T, R]) Cause() error

Cause returns the latched terminal cause of the stage, blocking until shutdown completes. Like Stage.Wait, Cause does not initiate shutdown. Unlike Stage.Wait, Cause distinguishes all three outcomes:

  • nil: all items completed successfully (or ContinueOnError with no cancel)
  • fail-fast error: the first fn error that caused shutdown
  • parent cancel cause: context.Cause of the parent context at completion

The terminal cause is latched when the last worker finishes, so Cause is stable and idempotent — it returns the same value regardless of later parent context changes. If parent cancellation races with a worker error, Cause may return either depending on observation order.

A parent cancellation that occurs after fn returns but before the worker completes result handoff is reported as stage cancellation, even though all business-level computations succeeded. Use Stage.Wait if only fail-fast errors matter.

Requires Out to be drained (see Stage.DiscardAndCause when individual results are not needed).

func (*Stage[T, R]) CloseInput

func (s *Stage[T, R]) CloseInput()

CloseInput signals that no more items will be submitted. Workers finish processing buffered items, then shut down.

Blocks briefly until all in-flight Stage.Submit calls exit, then closes the input channel.

Idempotent — safe to call multiple times (use defer as safety net). Also called internally on fail-fast or parent context cancellation.

func (*Stage[T, R]) DiscardAndCause

func (s *Stage[T, R]) DiscardAndCause() error

DiscardAndCause drains all remaining results from Stage.Out and returns Stage.Cause's latched terminal cause. Use when individual results are not needed but composable terminal status is required.

Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndCause with direct Out consumption causes a consumption race (results go to the wrong reader).

func (*Stage[T, R]) DiscardAndWait

func (s *Stage[T, R]) DiscardAndWait() error

DiscardAndWait drains all remaining results from Stage.Out and returns Stage.Wait's error. Use when individual results are not needed.

Requires exclusive ownership of Stage.Out — must not be called while another goroutine is reading Out. Mixing DiscardAndWait with direct Out consumption causes a consumption race (results go to the wrong reader).

func (*Stage[T, R]) Out

func (s *Stage[T, R]) Out() <-chan rslt.Result[R]

Out returns the receive-only output channel. It closes after all workers finish and all results have been sent.

Cardinality: every successful Stage.Submit produces exactly one result.

Ordering: with Workers == 1, results are delivered in submit order. With Workers > 1, result order is nondeterministic.

Callers MUST drain Out to completion (or use Stage.DiscardAndWait / Stage.DiscardAndCause):

for result := range stage.Out() {
    val, err := result.Unpack()
    // handle result — do NOT break out of this loop
}

After cancellation or fail-fast, Out may still emit: success results from work already in fn, ordinary error results from in-flight work, and canceled results for buffered items drained post-cancel. With Workers > 1, the fail-fast triggering error is not guaranteed to appear before cancellation results; use Stage.Wait or Stage.Cause for stage-level terminal status, not stream order.

If the consumer stops reading, workers block sending results, which prevents shutdown and causes Stage.Wait to hang — leaking goroutines, context resources, and the stage itself. This is the same contract as consuming from any Go channel-based pipeline.

func (*Stage[T, R]) Stats

func (s *Stage[T, R]) Stats() Stats

Stats returns approximate metrics. See Stats for caveats. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.

func (*Stage[T, R]) Submit

func (s *Stage[T, R]) Submit(ctx context.Context, item T) error

Submit sends item into the stage for processing. Blocks when the buffer is full (backpressure / "rope"). Returns ErrClosed after Stage.CloseInput, fail-fast shutdown, or parent context cancellation.

If cancellation or CloseInput has already occurred before Submit is called, Submit deterministically returns ErrClosed without blocking. A Submit that is blocked or entering concurrently when shutdown fires may nondeterministically succeed or return an error, per Go select semantics — even a blocked Submit can succeed if capacity becomes available at the same instant. Items admitted during this window are processed normally (or canceled if the stage context is already done).

The ctx parameter controls only admission blocking — it is NOT passed to fn. The stage's own context (derived from the ctx passed to Start) is what fn receives. This means canceling a submitter's context does not cancel the item's processing once admitted.

Panics if ctx is nil (same as context.Context method calls). Panics if Weight returns a negative value. Note: a panic in Weight propagates to the caller (unlike fn panics, which are recovered and wrapped in rslt.PanicError). Safe for concurrent use from multiple goroutines. Safe to call concurrently with Stage.CloseInput (will not panic).

func (*Stage[T, R]) Wait

func (s *Stage[T, R]) Wait() error

Wait blocks until all workers have finished and Out is closed. Wait does not initiate shutdown — call Stage.CloseInput first. Without CloseInput, Wait blocks forever if no more items are submitted. Requires that Out is drained concurrently (see Stage.Out). The stage is complete when all workers have finished, terminal status is latched, and the done channel closes.

Returns the first observed fail-fast error, or nil. Specifically:

  • In fail-fast mode: returns the first fn error that caused shutdown (nondeterministic among concurrent workers — first to acquire lock)
  • In ContinueOnError mode: always returns nil (check individual results)
  • On parent context cancellation: returns nil (caller already knows; errors returned by fn after parent cancel are not stored)

If parent cancellation races with a worker error, Wait may return either nil or the error depending on observation order. Use Stage.Cause for terminal status that distinguishes all three outcomes.

type Stats

type Stats struct {
	Submitted int64 // items accepted by Submit (successful return)
	Completed int64 // items where fn returned (includes Failed and Panicked)
	Failed    int64 // subset of Completed where result is error (includes Panicked)
	Panicked  int64 // subset of Failed where fn panicked
	Canceled  int64 // items dequeued but not passed to fn because cancellation was observed first (not in Completed)

	// Pipe-specific counters. Zero for Start-created stages.
	// Invariant (after Wait): Received = Submitted + Forwarded + Dropped.
	Received  int64 // items consumed from src by feeder (any Result)
	Forwarded int64 // upstream Err items sent directly to out (bypassed fn)
	Dropped   int64 // items seen but neither submitted nor forwarded (shutdown/cancel)

	ServiceTime       time.Duration // cumulative time fn was executing
	IdleTime          time.Duration // cumulative worker time waiting for input (includes startup and tail wait)
	OutputBlockedTime time.Duration // cumulative worker time blocked handing result to consumer (unbuffered out channel)

	BufferedDepth  int64 // approximate items in queue; may transiently be negative mid-flight; 0 when Capacity is 0 (unbuffered)
	InFlightWeight int64 // weighted cost of items currently in fn (stats-only, not admission)
	QueueCapacity  int   // configured capacity
}

Stats holds approximate metrics for a Stage.

Fields are read from independent atomics, so a single Stats value is NOT a consistent snapshot — individual fields may reflect slightly different moments. Relationships like Submitted >= Completed + Canceled are not guaranteed mid-flight. Stats are only reliable as final values after all Stage.Submit calls have returned and Stage.Wait returns.

All durations are cumulative across all workers since Start. With Workers > 1, durations can exceed wall-clock time.

type WeightedBatcher

type WeightedBatcher[T any] struct {
	// contains filtered or unexported fields
}

WeightedBatcher accumulates Ok items from an upstream Result channel into batches, flushing when accumulated weight OR item count reaches the threshold. Each item's weight is determined by weightFn. Errors act as batch boundaries: the partial batch is flushed, then the error is forwarded, and a fresh accumulator starts.

Created by NewWeightedBatcher. The zero value is not usable.

func NewWeightedBatcher

func NewWeightedBatcher[T any](
	ctx context.Context,
	src <-chan rslt.Result[T],
	threshold int,
	weightFn func(T) int,
) *WeightedBatcher[T]

NewWeightedBatcher creates a WeightedBatcher that reads from src, accumulates Ok values, and emits batches on Out(). A batch is flushed when either accumulated weight (per weightFn) reaches threshold or item count reaches threshold — whichever comes first. The item-count fallback prevents unbounded accumulation of zero/low-weight items. Errors from src act as batch boundaries: flush partial batch (if non-empty), forward the error, start fresh.

weightFn must return a non-negative weight for each item. A panic occurs if weightFn returns a negative value.

The WeightedBatcher drains src to completion (source ownership rule), provided the consumer drains WeightedBatcher.Out or ctx is canceled, and src eventually closes. Cancellation unblocks output sends but does not force-close src. After ctx cancellation, it switches to discard mode: continues reading src but discards all items without flushing partial batches. Batch emission and error forwarding are best-effort during shutdown — cancel may race with a successful send, so output may still appear after cancellation. All drops are reflected in stats. If the consumer stops reading Out and ctx is never canceled, the WeightedBatcher blocks on output delivery and cannot drain src.

Panics if threshold <= 0, weightFn is nil, src is nil, or ctx is nil.

func (*WeightedBatcher[T]) Out

func (b *WeightedBatcher[T]) Out() <-chan rslt.Result[[]T]

Out returns the receive-only output channel. It closes after the source channel closes and all batches have been emitted.

Callers MUST drain Out to completion. If the consumer stops reading, the WeightedBatcher blocks and cannot drain its source, potentially causing upstream deadlocks.

func (*WeightedBatcher[T]) Stats

func (b *WeightedBatcher[T]) Stats() WeightedBatcherStats

Stats returns approximate metrics. See WeightedBatcherStats for caveats. Stats are only reliable as final values after WeightedBatcher.Wait returns.

func (*WeightedBatcher[T]) Wait

func (b *WeightedBatcher[T]) Wait() error

Wait blocks until the WeightedBatcher goroutine exits. Returns ctx.Err() if context cancellation caused items to be dropped (discard mode or interrupted output send), nil otherwise. The WeightedBatcher has no fn, so there is no fail-fast error.

type WeightedBatcherStats

type WeightedBatcherStats struct {
	Received          int64         // individual items consumed from src
	Emitted           int64         // individual Ok items included in emitted batches
	Forwarded         int64         // Err items forwarded downstream
	Dropped           int64         // items lost to shutdown/cancel (includes partial batch items)
	BufferedDepth     int64         // current items in partial batch accumulator
	BufferedWeight    int64         // current accumulated weight in partial batch
	BatchCount        int64         // number of batch results emitted
	OutputBlockedTime time.Duration // cumulative time blocked sending to out
}

WeightedBatcherStats holds metrics for a WeightedBatcher.

Invariant (after Wait): Received = Emitted + Forwarded + Dropped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL