pipz

package module
v1.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MIT Imports: 13 Imported by: 3

README

pipz

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

Type-safe, composable data pipelines for Go.

Build processing pipelines from simple parts, compose them into complex flows, and get rich error context when things fail.

One Interface To Rule Them All

Every primitive in pipz implements Chainable[T]:

type Chainable[T any] interface {
    Process(context.Context, T) (T, error)
    Identity() Identity
    Schema() Node
    Close() error
}

Processors wrap your functions — the callback signature is the only difference:

// Transform: pure function, no errors
enrich := pipz.Transform(EnrichID, func(ctx context.Context, o Order) Order {
    o.ProcessedAt = time.Now()
    return o
})

// Apply: fallible function
validate := pipz.Apply(ValidateID, func(ctx context.Context, o Order) (Order, error) {
    if o.Total <= 0 {
        return o, errors.New("invalid total")
    }
    return o, nil
})

// Effect: side effect, data passes through unchanged
notify := pipz.Effect(NotifyID, func(ctx context.Context, o Order) error {
    return sendNotification(o.ID)
})

Connectors compose processors — and each other:

// Compose processors into a sequence
flow := pipz.NewSequence(FlowID, validate, enrich, notify)

// Wrap with resilience patterns
resilient := pipz.NewRetry(RetryID, flow, 3)
protected := pipz.NewTimeout(TimeoutID, resilient, 5*time.Second)

// Connectors nest freely — it's Chainable[T] all the way down
pipeline := pipz.NewCircuitBreaker(BreakerID, protected, 5, 30*time.Second)

Install

go get github.com/zoobz-io/pipz

Requires Go 1.24+.

Quick Start

package main

import (
    "context"
    "errors"
    "fmt"
    "strings"
    "time"

    "github.com/zoobz-io/pipz"
)

// Identities for debugging and observability
var (
    ValidateID = pipz.NewIdentity("validate", "Validates order totals")
    EnrichID   = pipz.NewIdentity("enrich", "Adds processing timestamp")
    FormatID   = pipz.NewIdentity("format", "Formats order ID")
    PipelineID = pipz.NewIdentity("order-flow", "Main order pipeline")
)

type Order struct {
    ID          string
    Total       float64
    ProcessedAt time.Time
}

func main() {
    ctx := context.Background()

    // Processors wrap functions
    validate := pipz.Apply(ValidateID, func(_ context.Context, o Order) (Order, error) {
        if o.Total <= 0 {
            return o, errors.New("invalid total")
        }
        return o, nil
    })

    enrich := pipz.Transform(EnrichID, func(_ context.Context, o Order) Order {
        o.ProcessedAt = time.Now()
        return o
    })

    format := pipz.Transform(FormatID, func(_ context.Context, o Order) Order {
        o.ID = strings.ToUpper(o.ID)
        return o
    })

    // Connectors compose processors
    pipeline := pipz.NewSequence(PipelineID, validate, enrich, format)

    // Process
    result, err := pipeline.Process(ctx, Order{ID: "order-123", Total: 99.99})
    if err != nil {
        var pipeErr *pipz.Error[Order]
        if errors.As(err, &pipeErr) {
            fmt.Printf("Failed at %s: %v\n", strings.Join(pipeErr.Path, "->"), pipeErr.Err)
        }
        return
    }

    fmt.Printf("Processed: %s at %v\n", result.ID, result.ProcessedAt)
}

Capabilities

Feature Description Docs
Uniform Interface Everything implements Chainable[T] for seamless composition Core Concepts
Type-Safe Generics Full compile-time checking with zero reflection Architecture
Rich Error Context Path tracking, timestamps, and input capture on failure Safety & Reliability
Panic Recovery Automatic recovery with security-conscious sanitization Safety & Reliability
Signal Observability State change events via capitan integration Hooks
Pipeline Schemas Schema() exports structure for visualization and debugging Cheatsheet

Why pipz?

  • Type-safe — Full compile-time checking with generics
  • Composable — Complex pipelines from simple parts
  • Minimal dependencies — Standard library plus clockz
  • Observable — Typed signals for state changes via capitan
  • Rich errors — Full path tracking shows exactly where failures occur
  • Panic-safe — Automatic recovery with security sanitization

Composable Reliability

Use pipz directly to build secure, observable reliability patterns over your types:

// Your domain type
type Order struct { ... }

// Wrap any operation with resilience
fetch := pipz.Apply(FetchID, fetchOrder)

reliable := pipz.NewSequence(ReliableID,
    pipz.NewRateLimiter[Order](LimiterID, 100, 10),      // throttle
    pipz.NewRetry(RetryID, fetch, 3),                    // retry on failure
    pipz.NewTimeout(TimeoutID, fetch, 5*time.Second),    // enforce deadline
    pipz.NewCircuitBreaker(BreakerID, fetch, 5, 30*time.Second), // prevent cascade
)

// Full error context when things fail
result, err := reliable.Process(ctx, order)

Every connector emits capitan signals — circuit breaker state changes, retry attempts, rate limit hits — observable without instrumentation code.

Extensible Application Vocabulary

Fix T to a domain type and Chainable[T] becomes your API surface:

// Library fixes T to a domain type
type File struct {
    Name     string
    Size     int64
    Data     []byte
    Metadata map[string]string
}

// Library provides domain-specific primitives
func Scan(scanner VirusScanner) pipz.Chainable[*File] { ... }
func Thumbnail(width, height int) pipz.Chainable[*File] { ... }
func Compress(quality int) pipz.Chainable[*File] { ... }
func Upload(storage Storage) pipz.Chainable[*File] { ... }

// Users extend with their own — same interface, first-class citizen
type Watermark struct {
    identity pipz.Identity
    logo     []byte
}
func (w *Watermark) Process(ctx context.Context, f *File) (*File, error) {
    f.Data = applyWatermark(f.Data, w.logo)
    return f, nil
}
func (w *Watermark) Identity() pipz.Identity { return w.identity }
func (w *Watermark) Schema() pipz.Node       { return pipz.Node{Identity: w.identity, Type: "processor"} }
func (w *Watermark) Close() error            { return nil }

// Everything composes — library primitives and user code, indistinguishable
pipeline := pipz.NewSequence(PipelineID,
    Scan(clamav),
    Thumbnail(800, 600),
    &Watermark{logo},  // user's primitive slots right in
    Compress(85),
    Upload(s3),
)

The built-in primitives are the base vocabulary. Users add their own words following the same grammar. The interface IS the API — implement it and express whatever you want.

Documentation

  • Overview — Design philosophy and architecture
Learn
Guides
Cookbook
Reference
  • Cheatsheet — Quick reference for all primitives
  • Types — Error, Identity, Node, Schema
Processors
Processor Purpose
Transform Pure transformation (no errors)
Apply Transformation that may fail
Effect Side effect, passes data through
Mutate Conditional modification
Enrich Best-effort enhancement (errors ignored)
Connectors
Connector Purpose
Sequence Run in order
Concurrent Run in parallel, collect all results
WorkerPool Bounded parallelism with fixed worker count
Scaffold Fire-and-forget parallel execution
Fallback Try primary, fall back on error
Race First success wins
Contest First result meeting condition wins
Switch Route based on conditions
Filter Conditional execution
Retry Retry on failure
Backoff Retry with exponential delays
Timeout Enforce time limits
Handle Error recovery pipeline
RateLimiter Token bucket rate limiting
CircuitBreaker Prevent cascading failures
Pipeline Execution context for tracing

Contributing

See CONTRIBUTING.md for guidelines. Run make help for available commands.

License

MIT License — see LICENSE for details.

Documentation

Overview

Package pipz provides a lightweight, type-safe library for building composable data processing pipelines in Go.

Overview

pipz enables developers to create clean, testable, and maintainable data processing workflows by composing small, focused functions into larger pipelines. It addresses common challenges in Go applications such as scattered business logic, repetitive error handling, and difficult-to-test code that mixes pure logic with external dependencies.

Installation

go get github.com/zoobz-io/pipz

Requires Go 1.24+ for generic type constraints.

Core Concepts

The library is built around a single, uniform interface:

type Chainable[T any] interface {
    Process(context.Context, T) (T, error)
    Identity() Identity
    Schema() Node
    Close() error
}

Key components:

  • Processors: Individual processing steps created with adapter functions (Transform, Apply, etc.)
  • Connectors: Compose multiple processors into complex flows (Sequence, Switch, Concurrent, etc.)
  • Sequence: The primary way to build sequential pipelines with runtime modification support

Design philosophy:

  • Processors are immutable values (simple functions wrapped with metadata)
  • Connectors are mutable pointers (configurable containers with state)

Everything implements Chainable[T], enabling seamless composition while maintaining type safety through Go generics. Context support provides timeout control and cancellation. Execution follows a fail-fast pattern where processing stops at the first error.

Adapter Functions

Adapters wrap your functions to implement the Chainable interface:

Transform - Pure transformations that cannot fail:

var DoubleName = pipz.NewIdentity("double", "")
double := pipz.Transform(DoubleName, func(_ context.Context, n int) int {
    return n * 2
})

Apply - Operations that can fail:

var ParseJSONName = pipz.NewIdentity("parse", "")
parseJSON := pipz.Apply(ParseJSONName, func(_ context.Context, s string) (Data, error) {
    var d Data
    return d, json.Unmarshal([]byte(s), &d)
})

Effect - Side effects without modifying data:

var LoggerName = pipz.NewIdentity("log", "")
logger := pipz.Effect(LoggerName, func(_ context.Context, d Data) error {
    log.Printf("Processing: %+v", d)
    return nil
})

Mutate - Conditional modifications:

var DiscountName = pipz.NewIdentity("discount", "")
discountPremium := pipz.Mutate(DiscountName,
    func(_ context.Context, u User) User { u.Discount = 0.2; return u },
    func(_ context.Context, u User) bool { return u.IsPremium },
)

Enrich - Optional enhancements that log failures:

var GeoEnrichName = pipz.NewIdentity("geo", "")
addLocation := pipz.Enrich(GeoEnrichName, func(ctx context.Context, u User) (User, error) {
    u.Country = detectCountry(u.IP) // May fail, but won't stop pipeline
    return u, nil
})

Connectors

Connectors compose multiple Chainables. Choose based on your needs:

Sequential Processing:

var PipelineName = pipz.NewIdentity("pipeline", "")
pipeline := pipz.NewSequence(PipelineName, step1, step2, step3)
// Or build dynamically:
var DynamicName = pipz.NewIdentity("dynamic", "")
seq := pipz.NewSequence[T](DynamicName)
seq.Register(step1, step2)
seq.PushTail(step3)  // Add at runtime

Parallel Processing (requires T implements Cloner[T]):

// Run all processors, return original data
var ParallelName = pipz.NewIdentity("parallel", "")
concurrent := pipz.NewConcurrent(ParallelName, proc1, proc2, proc3)

// Return first successful result
var FastestName = pipz.NewIdentity("fastest", "")
race := pipz.NewRace(FastestName, primary, secondary, tertiary)

// Return first result meeting a condition
var BestName = pipz.NewIdentity("best", "")
contest := pipz.NewContest(BestName, conditionFunc, option1, option2, option3)

Error Handling:

// Try fallback on error
var SafeName = pipz.NewIdentity("safe", "")
fallback := pipz.NewFallback(SafeName, primary, backup)

// Retry with attempts
var ResilientName = pipz.NewIdentity("resilient", "")
retry := pipz.NewRetry(ResilientName, processor, 3)

// Retry with exponential backoff
var ApiCallName = pipz.NewIdentity("api-call", "")
backoff := pipz.NewBackoff(ApiCallName, processor, 5, time.Second)

// Handle errors without changing data flow
var ObservedName = pipz.NewIdentity("observed", "")
handle := pipz.NewHandle(ObservedName, processor, errorPipeline)

Control Flow:

// Route based on conditions
var RouterName = pipz.NewIdentity("router", "")
router := pipz.NewSwitch(RouterName, func(ctx context.Context, d Data) string {
    if d.Type == "premium" { return "premium-flow" }
    return "standard-flow"
})
router.AddRoute("premium-flow", premiumProcessor)
router.AddRoute("standard-flow", standardProcessor)

// Enforce timeouts
var DeadlineName = pipz.NewIdentity("deadline", "")
timeout := pipz.NewTimeout(DeadlineName, processor, 30*time.Second)

// Conditional processing
var FeatureFlagName = pipz.NewIdentity("feature-flag", "")
filter := pipz.NewFilter(FeatureFlagName,
    func(ctx context.Context, u User) bool { return u.BetaEnabled },
    betaProcessor,
)

Resource Protection:

// Rate limiting
var ApiLimitName = pipz.NewIdentity("api-limit", "")
rateLimiter := pipz.NewRateLimiter(ApiLimitName, 100, 10) // 100/sec, burst 10
rateLimiter.SetMode("drop") // Or "wait" (default)

// Circuit breaker
var ServiceBreakerName = pipz.NewIdentity("service-breaker", "")
breaker := pipz.NewCircuitBreaker(ServiceBreakerName, processor, 5, 30*time.Second)

Quick Start

Simple example - transform strings through a pipeline:

package main

import (
    "context"
    "strings"
    "github.com/zoobz-io/pipz"
)

func main() {
    // Define processor names as constants
    const (
        TrimName = pipz.NewIdentity("trim", "")
        UpperName = pipz.NewIdentity("uppercase", "")
        TextProcessorName = pipz.NewIdentity("text-processor", "")
    )

    // Create processors
    trim := pipz.Transform(TrimName, func(_ context.Context, s string) string {
        return strings.TrimSpace(s)
    })
    upper := pipz.Transform(UpperName, func(_ context.Context, s string) string {
        return strings.ToUpper(s)
    })

    // Method 1: Direct composition
    pipeline := pipz.NewSequence(TextProcessorName, trim, upper)

    // Method 2: Build dynamically
    sequence := pipz.NewSequence[string](TextProcessorName)
    sequence.Register(trim, upper)

    // Execute
    result, err := pipeline.Process(context.Background(), "  hello world  ")
    // result: "HELLO WORLD", err: nil
}

Implementing Cloner[T]

For parallel processing with Concurrent or Race, types must implement Cloner[T]:

type Order struct {
    ID    string
    Items []Item        // Slice needs copying
    Meta  map[string]any // Map needs copying
}

func (o Order) Clone() Order {
    // Deep copy slice
    items := make([]Item, len(o.Items))
    for i, item := range o.Items {
        items[i] = item.Clone() // If Item also has references
    }

    // Deep copy map
    meta := make(map[string]any, len(o.Meta))
    for k, v := range o.Meta {
        meta[k] = v // Adjust based on value types
    }

    return Order{ID: o.ID, Items: items, Meta: meta}
}

Choosing the Right Connector

  • NewSequence: Default choice for step-by-step processing
  • Sequence: When you need to modify pipeline at runtime
  • Switch: For conditional routing based on data
  • Filter: For conditional processing (execute or skip)
  • Concurrent: For parallel independent operations (requires Cloner[T])
  • Race: When you need the fastest result
  • Contest: When you need the fastest result that meets criteria
  • Fallback: For primary/backup patterns
  • Retry/Backoff: For handling transient failures
  • Timeout: For operations that might hang
  • Handle: For error monitoring without changing flow
  • RateLimiter: For protecting rate-limited resources
  • CircuitBreaker: For preventing cascade failures

Error Handling

pipz provides rich error information through the Error[T] type:

type Error[T any] struct {
    Path      []Identity    // Full path of Identity values through the pipeline
    InputData T             // The input that caused the failure
    Err       error         // The underlying error
    Timestamp time.Time     // When the error occurred
    Duration  time.Duration // How long before failure
    Timeout   bool          // Was it a timeout?
    Canceled  bool          // Was it canceled?
}

Error handling example:

result, err := pipeline.Process(ctx, data)
if err != nil {
    var pipeErr *pipz.Error[Data]
    if errors.As(err, &pipeErr) {
        // pipeErr.Error() formats the path automatically
        log.Printf("Pipeline error: %v", pipeErr)
        log.Printf("Input data: %+v", pipeErr.InputData)
        log.Printf("After: %v", pipeErr.Duration)

        if pipeErr.Timeout {
            // Handle timeout specifically
        }
    }
}

Performance

pipz is designed for exceptional performance:

  • Transform: 2.7ns per operation with zero allocations
  • Apply/Effect (success): 46ns per operation with zero allocations
  • Basic pipeline overhead: ~88 bytes, 3 allocations (constant regardless of length)
  • Linear scaling: 5-step pipeline ~560ns, 50-step pipeline ~2.8μs
  • No reflection or runtime type assertions
  • Predictable performance characteristics

See PERFORMANCE.md for detailed benchmarks.

Best Practices

  1. Keep processors small and focused on a single responsibility
  2. Use descriptive names for processors to aid debugging
  3. Implement Cloner[T] correctly for types used with Concurrent/Race
  4. Use NewSequence() for both static and dynamic pipelines
  5. Check context.Err() in long-running processors
  6. Let errors bubble up - handle at pipeline level
  7. Use Effect for side effects to maintain purity
  8. Test processors in isolation before composing
  9. Prefer Transform over Apply when errors aren't possible

10. Use timeouts at the pipeline level, not individual processors

Common Patterns

Validation Pipeline:

const (
    ValidationName = pipz.NewIdentity("validation", "")
    RequiredName = pipz.NewIdentity("required", "")
    FormatName = pipz.NewIdentity("format", "")
    SanitizeName = pipz.NewIdentity("sanitize", "")
)

validation := pipz.NewSequence(ValidationName,
    pipz.Effect(RequiredName, checkRequired),
    pipz.Effect(FormatName, checkFormat),
    pipz.Apply(SanitizeName, sanitizeInput),
)

API with Retry and Timeout:

const (
    ApiTimeoutName = pipz.NewIdentity("api-timeout", "")
    ApiRetryName = pipz.NewIdentity("api-retry", "")
    FetchName = pipz.NewIdentity("fetch", "")
)

apiCall := pipz.NewTimeout(ApiTimeoutName,
    pipz.NewBackoff(ApiRetryName,
        pipz.Apply(FetchName, fetchFromAPI),
        3, time.Second,
    ),
    30*time.Second,
)

Multi-path Processing:

var TypeRouterName = pipz.NewIdentity("type-router", "")
processor := pipz.NewSwitch(TypeRouterName, detectType)
processor.AddRoute("json", jsonProcessor)
processor.AddRoute("xml", xmlProcessor)
processor.AddRoute("csv", csvProcessor)

For more examples, see the examples directory.

Index

Constants

This section is empty.

Variables

View Source
var (
	SequenceKey       = FlowKey[SequenceFlow]{/* contains filtered or unexported fields */}
	FallbackKey       = FlowKey[FallbackFlow]{/* contains filtered or unexported fields */}
	RaceKey           = FlowKey[RaceFlow]{/* contains filtered or unexported fields */}
	ContestKey        = FlowKey[ContestFlow]{/* contains filtered or unexported fields */}
	ConcurrentKey     = FlowKey[ConcurrentFlow]{/* contains filtered or unexported fields */}
	SwitchKey         = FlowKey[SwitchFlow]{/* contains filtered or unexported fields */}
	FilterKey         = FlowKey[FilterFlow]{/* contains filtered or unexported fields */}
	HandleKey         = FlowKey[HandleFlow]{/* contains filtered or unexported fields */}
	ScaffoldKey       = FlowKey[ScaffoldFlow]{/* contains filtered or unexported fields */}
	BackoffKey        = FlowKey[BackoffFlow]{/* contains filtered or unexported fields */}
	RetryKey          = FlowKey[RetryFlow]{/* contains filtered or unexported fields */}
	TimeoutKey        = FlowKey[TimeoutFlow]{/* contains filtered or unexported fields */}
	RateLimiterKey    = FlowKey[RateLimiterFlow]{/* contains filtered or unexported fields */}
	CircuitBreakerKey = FlowKey[CircuitBreakerFlow]{/* contains filtered or unexported fields */}
	WorkerpoolKey     = FlowKey[WorkerpoolFlow]{/* contains filtered or unexported fields */}
	PipelineKey       = FlowKey[PipelineFlow]{/* contains filtered or unexported fields */}
)

Pre-defined FlowKeys for each flow type.

View Source
var (
	ErrIndexOutOfBounds = errors.New("index out of bounds")
	ErrEmptySequence    = errors.New("sequence is empty")
	ErrInvalidRange     = errors.New("invalid range")
)

Sequence modification errors.

View Source
var (
	// CircuitBreaker signals.
	SignalCircuitBreakerOpened = capitan.NewSignal(
		"circuitbreaker.opened",
		"Circuit breaker has transitioned to open state due to exceeding failure threshold",
	)
	SignalCircuitBreakerClosed = capitan.NewSignal(
		"circuitbreaker.closed",
		"Circuit breaker has transitioned to closed state after successful recovery",
	)
	SignalCircuitBreakerHalfOpen = capitan.NewSignal(
		"circuitbreaker.half-open",
		"Circuit breaker has transitioned to half-open state to test if the issue has resolved",
	)
	SignalCircuitBreakerRejected = capitan.NewSignal(
		"circuitbreaker.rejected",
		"Circuit breaker rejected a request because it is in open state",
	)

	// RateLimiter signals.
	SignalRateLimiterThrottled = capitan.NewSignal(
		"ratelimiter.throttled",
		"Rate limiter delayed a request to comply with rate limits",
	)
	SignalRateLimiterDropped = capitan.NewSignal(
		"ratelimiter.dropped",
		"Rate limiter rejected a request because rate limit was exceeded and drop mode is enabled",
	)
	SignalRateLimiterAllowed = capitan.NewSignal(
		"ratelimiter.allowed",
		"Rate limiter allowed a request to proceed",
	)

	// WorkerPool signals.
	SignalWorkerPoolSaturated = capitan.NewSignal(
		"workerpool.saturated",
		"Worker pool has reached maximum capacity and is waiting for available workers",
	)
	SignalWorkerPoolAcquired = capitan.NewSignal(
		"workerpool.acquired",
		"Worker pool acquired a worker slot for processing",
	)
	SignalWorkerPoolReleased = capitan.NewSignal(
		"workerpool.released",
		"Worker pool released a worker slot after processing completed",
	)

	// Retry signals.
	SignalRetryAttemptStart = capitan.NewSignal(
		"retry.attempt-start",
		"Retry connector is starting an execution attempt",
	)
	SignalRetryAttemptFail = capitan.NewSignal(
		"retry.attempt-fail",
		"Retry connector attempt failed and will be retried if attempts remain",
	)
	SignalRetryExhausted = capitan.NewSignal(
		"retry.exhausted",
		"Retry connector has exhausted all retry attempts and is failing",
	)

	// Fallback signals.
	SignalFallbackAttempt = capitan.NewSignal(
		"fallback.attempt",
		"Fallback connector is attempting to execute a processor in the fallback chain",
	)
	SignalFallbackFailed = capitan.NewSignal(
		"fallback.failed",
		"Fallback connector exhausted all processors without success",
	)

	// Timeout signals.
	SignalTimeoutTriggered = capitan.NewSignal(
		"timeout.triggered",
		"Timeout connector canceled execution because the deadline was exceeded",
	)

	// Backoff signals.
	SignalBackoffWaiting = capitan.NewSignal(
		"backoff.waiting",
		"Backoff connector is delaying before the next execution attempt",
	)

	// Sequence signals.
	SignalSequenceCompleted = capitan.NewSignal(
		"sequence.completed",
		"Sequence connector completed processing all processors successfully",
	)

	// Concurrent signals.
	SignalConcurrentCompleted = capitan.NewSignal(
		"concurrent.completed",
		"Concurrent connector completed all parallel processors",
	)

	// Race signals.
	SignalRaceWinner = capitan.NewSignal(
		"race.winner",
		"Race connector determined a winner from parallel processors",
	)

	// Contest signals.
	SignalContestWinner = capitan.NewSignal(
		"contest.winner",
		"Contest connector found a result meeting the condition",
	)

	// Scaffold signals.
	SignalScaffoldDispatched = capitan.NewSignal(
		"scaffold.dispatched",
		"Scaffold connector dispatched processors for background execution",
	)

	// Switch signals.
	SignalSwitchRouted = capitan.NewSignal(
		"switch.routed",
		"Switch connector routed data to a processor based on condition",
	)

	// Filter signals.
	SignalFilterEvaluated = capitan.NewSignal(
		"filter.evaluated",
		"Filter connector evaluated condition and determined whether to process",
	)

	// Handle signals.
	SignalHandleErrorHandled = capitan.NewSignal(
		"handle.error-handled",
		"Handle connector processed an error through the error handler",
	)
)

Signal definitions for pipz connector events. Signals follow the pattern: <connector-type>.<event>.

View Source
var (
	// Identity tracking field.
	FieldIdentityID = capitan.NewStringKey("identity_id") // UUID of the component

	// Common fields.
	FieldName      = capitan.NewStringKey("name")       // Connector instance name
	FieldError     = capitan.NewStringKey("error")      // Error message
	FieldTimestamp = capitan.NewFloat64Key("timestamp") // Unix timestamp

	// CircuitBreaker fields.
	FieldState            = capitan.NewStringKey("state")           // Circuit state: closed/open/half-open
	FieldFailures         = capitan.NewIntKey("failures")           // Current failure count
	FieldSuccesses        = capitan.NewIntKey("successes")          // Current success count
	FieldFailureThreshold = capitan.NewIntKey("failure_threshold")  // Threshold to open
	FieldSuccessThreshold = capitan.NewIntKey("success_threshold")  // Threshold to close from half-open
	FieldResetTimeout     = capitan.NewFloat64Key("reset_timeout")  // Reset timeout in seconds
	FieldGeneration       = capitan.NewIntKey("generation")         // Circuit generation number
	FieldLastFailTime     = capitan.NewFloat64Key("last_fail_time") // Last failure timestamp

	// RateLimiter fields.
	FieldRate     = capitan.NewFloat64Key("rate")      // Requests per second
	FieldBurst    = capitan.NewIntKey("burst")         // Burst capacity
	FieldTokens   = capitan.NewFloat64Key("tokens")    // Current tokens
	FieldMode     = capitan.NewStringKey("mode")       // Mode: wait/drop
	FieldWaitTime = capitan.NewFloat64Key("wait_time") // Wait time in seconds

	// WorkerPool fields.
	FieldWorkerCount   = capitan.NewIntKey("worker_count")   // Total worker slots
	FieldActiveWorkers = capitan.NewIntKey("active_workers") // Currently active workers

	// Retry fields.
	FieldAttempt     = capitan.NewIntKey("attempt")      // Current attempt number
	FieldMaxAttempts = capitan.NewIntKey("max_attempts") // Maximum attempts

	// Fallback fields.
	FieldProcessorIndex = capitan.NewIntKey("processor_index")   // Index of processor being tried
	FieldProcessorName  = capitan.NewStringKey("processor_name") // Name of processor being tried

	// Timeout fields.
	FieldDuration = capitan.NewFloat64Key("duration") // Timeout duration in seconds

	// Backoff fields.
	FieldDelay     = capitan.NewFloat64Key("delay")      // Current backoff delay in seconds
	FieldNextDelay = capitan.NewFloat64Key("next_delay") // Next delay if this attempt fails in seconds

	// Sequence fields.
	FieldProcessorCount = capitan.NewIntKey("processor_count") // Number of processors in chain

	// Concurrent fields.
	FieldErrorCount = capitan.NewIntKey("error_count") // Number of errors from parallel execution

	// Race/Contest fields.
	FieldWinnerName = capitan.NewStringKey("winner_name") // Name of winning processor

	// Switch fields.
	FieldRouteKey = capitan.NewStringKey("route_key") // Key used for routing
	FieldMatched  = capitan.NewBoolKey("matched")     // Whether a route was matched

	// Filter fields.
	FieldPassed = capitan.NewBoolKey("passed") // Whether filter condition passed
)

Common field keys using capitan primitive types. All keys use primitive types to avoid custom struct serialization.

Functions

func ExecutionIDFromContext

func ExecutionIDFromContext(ctx context.Context) (uuid.UUID, bool)

ExecutionIDFromContext extracts the execution ID from context. Returns the execution UUID and true if present, or uuid.Nil and false otherwise.

func PipelineIDFromContext

func PipelineIDFromContext(ctx context.Context) (uuid.UUID, bool)

PipelineIDFromContext extracts the pipeline ID from context. Returns the pipeline UUID and true if present, or uuid.Nil and false otherwise.

Types

type Backoff

type Backoff[T any] struct {
	// contains filtered or unexported fields
}

Backoff attempts the processor with exponential backoff between attempts. Backoff adds intelligent spacing between retry attempts, starting with baseDelay and doubling after each failure. This prevents overwhelming failed services and allows time for transient issues to resolve.

The exponential backoff pattern (delay, 2*delay, 4*delay, ...) is widely used for its effectiveness in handling various failure scenarios without overwhelming systems. The operation can be canceled via context during waits.

Ideal for:

  • API calls to rate-limited services
  • Database operations during high load
  • Distributed system interactions
  • Any operation where immediate retry is counterproductive

The total time spent can be significant with multiple retries. For example, with baseDelay=1s and maxAttempts=5:

Delays: 1s, 2s, 4s, 8s (total wait: 15s plus processing time)

func NewBackoff

func NewBackoff[T any](identity Identity, processor Chainable[T], maxAttempts int, baseDelay time.Duration) *Backoff[T]

NewBackoff creates a new Backoff connector.

func (*Backoff[T]) Close

func (b *Backoff[T]) Close() error

Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.

func (*Backoff[T]) GetBaseDelay

func (b *Backoff[T]) GetBaseDelay() time.Duration

GetBaseDelay returns the current base delay setting.

func (*Backoff[T]) GetMaxAttempts

func (b *Backoff[T]) GetMaxAttempts() int

GetMaxAttempts returns the current maximum attempts setting.

func (*Backoff[T]) Identity

func (b *Backoff[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Backoff[T]) Process

func (b *Backoff[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface.

func (*Backoff[T]) Schema

func (b *Backoff[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Backoff[T]) SetBaseDelay

func (b *Backoff[T]) SetBaseDelay(d time.Duration) *Backoff[T]

SetBaseDelay updates the base delay duration.

func (*Backoff[T]) SetMaxAttempts

func (b *Backoff[T]) SetMaxAttempts(n int) *Backoff[T]

SetMaxAttempts updates the maximum number of retry attempts.

func (*Backoff[T]) WithClock

func (b *Backoff[T]) WithClock(clock clockz.Clock) *Backoff[T]

WithClock sets a custom clock for testing.

type BackoffFlow

type BackoffFlow struct {
	Processor Node `json:"processor"`
}

BackoffFlow represents processing with exponential backoff retry.

func (BackoffFlow) Variant

func (BackoffFlow) Variant() FlowVariant

Variant implements Flow.

type Chainable

type Chainable[T any] interface {
	Process(context.Context, T) (T, error)
	Identity() Identity
	Schema() Node
	Close() error
}

Chainable defines the interface for any component that can process values of type T. This interface enables composition of different processing components that operate on the same type.

Chainable is the foundation of pipz - every processor, pipeline, and connector implements this interface. The uniform interface enables seamless composition while maintaining type safety through Go generics.

Key design principles:

  • Context support for timeout and cancellation
  • Type safety through generics (no interface{})
  • Error propagation for fail-fast behavior
  • Immutable by convention (return modified copies)
  • Identity-based components for debugging, monitoring, and visualization

type CircuitBreaker

type CircuitBreaker[T any] struct {
	// contains filtered or unexported fields
}

CircuitBreaker prevents cascading failures by stopping requests to failing services. CircuitBreaker implements the circuit breaker pattern with three states:

  • Closed: Normal operation, requests pass through
  • Open: Requests fail immediately without calling the wrapped processor
  • Half-Open: Testing state, limited requests to check if service recovered

CRITICAL: CircuitBreaker is a STATEFUL connector that tracks failure counts across requests. Create it once and reuse it - do NOT create a new CircuitBreaker for each request, as that would reset the failure count and the circuit would never open.

❌ WRONG - Creating per request (never opens):

func handleRequest(req Request) Response {
    breaker := pipz.NewCircuitBreaker("api", proc, 5, 30*time.Second)  // NEW breaker!
    return breaker.Process(ctx, req)  // Always closed, failure count always 0
}

✅ RIGHT - Create once, reuse:

var apiBreaker = pipz.NewCircuitBreaker("api", apiProcessor, 5, 30*time.Second)

func handleRequest(req Request) Response {
    return apiBreaker.Process(ctx, req)  // Tracks failures across requests
}

The circuit opens after consecutive failures reach the threshold. After a timeout period, it transitions to half-open to test recovery. Successful requests in half-open state close the circuit, while failures reopen it.

CircuitBreaker is essential for:

  • Preventing cascade failures in distributed systems
  • Giving failing services time to recover
  • Failing fast when services are down
  • Reducing unnecessary load on struggling services
  • Improving overall system resilience

Best Practices:

  • Use const names for all processors/connectors (see best-practices.md)
  • Create CircuitBreakers once and reuse them (e.g., as struct fields or package variables)
  • Set thresholds based on service characteristics
  • Combine with RateLimiter for comprehensive protection
  • Monitor circuit state for operational awareness

Example:

// Define names as constants
const (
    ConnectorAPIBreaker      = "api-breaker"
    ConnectorDatabaseBreaker = "db-breaker"
    ProcessorAPICall         = "api-call"
)

// Create breakers once and reuse
var (
    // External API - fail fast, longer recovery
    apiBreaker = pipz.NewCircuitBreaker(
        ConnectorAPIBreaker,
        pipz.Apply(ProcessorAPICall, callExternalAPI),
        5,                    // Open after 5 failures
        30 * time.Second,     // Try recovery after 30s
    )

    // Internal database - more tolerant
    dbBreaker = pipz.NewCircuitBreaker(
        ConnectorDatabaseBreaker,
        pipz.Apply("db-query", queryDatabase),
        10,                   // Open after 10 failures
        10 * time.Second,     // Try recovery after 10s
    )
)

// Combine with rate limiting for full protection
func createResilientPipeline() pipz.Chainable[Request] {
    return pipz.NewSequence("resilient-pipeline",
        rateLimiter,    // Protect downstream from overload
        apiBreaker,     // Fail fast if service is down
        pipz.NewRetry("retry", processor, 3),  // Retry transient failures
    )
}

func NewCircuitBreaker

func NewCircuitBreaker[T any](identity Identity, processor Chainable[T], failureThreshold int, resetTimeout time.Duration) *CircuitBreaker[T]

NewCircuitBreaker creates a new CircuitBreaker connector. The failureThreshold sets how many consecutive failures trigger opening. The resetTimeout sets how long to wait before attempting recovery.

func (*CircuitBreaker[T]) Close

func (cb *CircuitBreaker[T]) Close() error

Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.

func (*CircuitBreaker[T]) GetFailureThreshold

func (cb *CircuitBreaker[T]) GetFailureThreshold() int

GetFailureThreshold returns the current failure threshold.

func (*CircuitBreaker[T]) GetResetTimeout

func (cb *CircuitBreaker[T]) GetResetTimeout() time.Duration

GetResetTimeout returns the current reset timeout.

func (*CircuitBreaker[T]) GetState

func (cb *CircuitBreaker[T]) GetState() string

GetState returns the current circuit state.

func (*CircuitBreaker[T]) GetSuccessThreshold

func (cb *CircuitBreaker[T]) GetSuccessThreshold() int

GetSuccessThreshold returns the current success threshold.

func (*CircuitBreaker[T]) Identity

func (cb *CircuitBreaker[T]) Identity() Identity

Identity returns the identity of this connector.

func (*CircuitBreaker[T]) Process

func (cb *CircuitBreaker[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface.

func (*CircuitBreaker[T]) Reset

func (cb *CircuitBreaker[T]) Reset() *CircuitBreaker[T]

Reset manually resets the circuit to closed state.

func (*CircuitBreaker[T]) Schema

func (cb *CircuitBreaker[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*CircuitBreaker[T]) SetFailureThreshold

func (cb *CircuitBreaker[T]) SetFailureThreshold(n int) *CircuitBreaker[T]

SetFailureThreshold updates the consecutive failures needed to open the circuit.

func (*CircuitBreaker[T]) SetResetTimeout

func (cb *CircuitBreaker[T]) SetResetTimeout(d time.Duration) *CircuitBreaker[T]

SetResetTimeout updates the time to wait before attempting recovery.

func (*CircuitBreaker[T]) SetSuccessThreshold

func (cb *CircuitBreaker[T]) SetSuccessThreshold(n int) *CircuitBreaker[T]

SetSuccessThreshold updates the successes needed to close from half-open state.

func (*CircuitBreaker[T]) WithClock

func (cb *CircuitBreaker[T]) WithClock(clock clockz.Clock) *CircuitBreaker[T]

WithClock sets a custom clock for testing.

type CircuitBreakerFlow

type CircuitBreakerFlow struct {
	Processor Node `json:"processor"`
}

CircuitBreakerFlow represents processing with circuit breaker protection.

func (CircuitBreakerFlow) Variant

func (CircuitBreakerFlow) Variant() FlowVariant

Variant implements Flow.

type Cloner

type Cloner[T any] interface {
	Clone() T
}

Cloner is an interface for types that can create deep copies of themselves. Implementing this interface is required to use types with Concurrent and Race connectors, providing a type-safe and performant alternative to reflection-based copying.

The Clone method must return a deep copy where modifications to the clone do not affect the original value. For types containing pointers, slices, or maps, ensure these are also copied to achieve true isolation between concurrent processors.

Example implementation:

type Order struct {
    ID       string
    Items    []Item
    Status   string
    Metadata map[string]string
}

func (o Order) Clone() Order {
    // Deep copy slice
    items := make([]Item, len(o.Items))
    copy(items, o.Items)

    // Deep copy map
    metadata := make(map[string]string, len(o.Metadata))
    for k, v := range o.Metadata {
        metadata[k] = v
    }

    return Order{
        ID:       o.ID,
        Items:    items,
        Status:   o.Status,
        Metadata: metadata,
    }
}

type Concurrent

type Concurrent[T Cloner[T]] struct {
	// contains filtered or unexported fields
}

Concurrent runs all processors in parallel with the original context preserved. This connector passes the original context directly to each processor, preserving distributed tracing information, spans, and other context values. Each processor receives a deep copy of the input, ensuring complete isolation.

Concurrent supports two modes:

  • Without reducer (nil): Returns the original input unchanged after all processors complete
  • With reducer: Collects all results and errors, then calls the reducer function to produce the final output

The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.

Use Concurrent when you need:

  • Distributed tracing to work across concurrent operations
  • All processors to respect the original context's cancellation
  • To wait for all processors to complete before continuing
  • Multiple side effects to happen simultaneously
  • To aggregate results from parallel operations (with reducer)

Common use cases:

  • Sending traced notifications to multiple channels
  • Updating multiple external systems with trace context
  • Parallel logging with trace IDs preserved
  • Fetching data from multiple sources and merging results
  • Operations that must all complete or be canceled together

Important characteristics:

  • Input type must implement Cloner[T] interface
  • All processors run regardless of individual failures
  • Context cancellation immediately affects all processors
  • Preserves trace context and spans for distributed tracing
  • Waits for all processors to complete
  • Reducer receives map[string]T for results and map[string]error for errors (keyed by processor name)

Example without reducer (side effects):

type Order struct {
    ID     string
    Items  []Item
    Status string
}

func (o Order) Clone() Order {
    items := make([]Item, len(o.Items))
    copy(items, o.Items)
    return Order{
        ID:     o.ID,
        Items:  items,
        Status: o.Status,
    }
}

var NotifyOrderID = pipz.NewIdentity("notify-order", "Sends notifications for order")
concurrent := pipz.NewConcurrent(
    NotifyOrderID,
    nil, // no reducer, just run side effects
    sendEmailNotification,
    sendSMSNotification,
    updateInventorySystem,
    logToAnalytics,
)

Example with reducer (aggregate results):

type PriceCheck struct {
    ProductID string
    BestPrice float64
}

func (p PriceCheck) Clone() PriceCheck {
    return p
}

reducer := func(original PriceCheck, results map[string]PriceCheck, errors map[string]error) PriceCheck {
    bestPrice := original.BestPrice
    for _, result := range results {
        if result.BestPrice < bestPrice {
            bestPrice = result.BestPrice
        }
    }
    return PriceCheck{ProductID: original.ProductID, BestPrice: bestPrice}
}

var CheckPricesID = pipz.NewIdentity("check-prices", "Checks prices across vendors")
concurrent := pipz.NewConcurrent(
    CheckPricesID,
    reducer,
    checkAmazon,
    checkWalmart,
    checkTarget,
)

func NewConcurrent

func NewConcurrent[T Cloner[T]](identity Identity, reducer func(original T, results map[Identity]T, errors map[Identity]error) T, processors ...Chainable[T]) *Concurrent[T]

NewConcurrent creates a new Concurrent connector. If reducer is nil, the original input is returned unchanged. If reducer is provided, it receives the original input, all processor results, and any errors (keyed by processor Identity), allowing you to aggregate or merge results into a new T.

func (*Concurrent[T]) Add

func (c *Concurrent[T]) Add(processor Chainable[T]) *Concurrent[T]

Add appends a processor to the concurrent execution list.

func (*Concurrent[T]) Clear

func (c *Concurrent[T]) Clear() *Concurrent[T]

Clear removes all processors from the concurrent execution list.

func (*Concurrent[T]) Close

func (c *Concurrent[T]) Close() error

Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.

func (*Concurrent[T]) Identity

func (c *Concurrent[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Concurrent[T]) Len

func (c *Concurrent[T]) Len() int

Len returns the number of processors.

func (*Concurrent[T]) Process

func (c *Concurrent[T]) Process(ctx context.Context, input T) (result T, err error)

Process implements the Chainable interface.

func (*Concurrent[T]) Remove

func (c *Concurrent[T]) Remove(index int) error

Remove removes the processor at the specified index.

func (*Concurrent[T]) Schema

func (c *Concurrent[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Concurrent[T]) SetProcessors

func (c *Concurrent[T]) SetProcessors(processors ...Chainable[T]) *Concurrent[T]

SetProcessors replaces all processors atomically.

type ConcurrentFlow

type ConcurrentFlow struct {
	Tasks []Node `json:"tasks"`
}

ConcurrentFlow represents parallel execution of independent tasks. All tasks run simultaneously; combined into single output.

func (ConcurrentFlow) Variant

func (ConcurrentFlow) Variant() FlowVariant

Variant implements Flow.

type Condition

type Condition[T any] func(context.Context, T) string

Condition determines routing based on input data. Returns a route key string for multi-way branching.

Define string constants for type-safe routing:

const (
    RouteStandard   = "standard"
    RouteHighValue  = "high_value"
    RouteCrypto     = "crypto"
)

Common patterns include routing by:

  • Status strings for workflow states
  • Region identifiers
  • Priority levels as strings
  • Feature flag names

type Contest

type Contest[T Cloner[T]] struct {
	// contains filtered or unexported fields
}

Contest runs all processors in parallel and returns the first result that meets a specified condition. Contest combines competitive processing (like Race) with conditional selection, allowing you to define what makes a "winner" beyond just being first to complete.

Context handling: Contest uses context.WithCancel(ctx) to create a derived context that preserves all parent context values (including trace IDs) while allowing cancellation of other processors when a winner meeting the condition is found.

The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.

This pattern excels when you have multiple ways to get a result and want the fastest one that meets specific criteria:

  • Finding the cheapest shipping rate under a time constraint
  • Getting the first API response with required data completeness
  • Querying multiple sources for the best quality result quickly
  • Racing services where the "best" result matters more than just "first"
  • Any scenario where you need speed AND quality criteria

Key behaviors:

  • First result meeting the condition wins and cancels others
  • If no results meet the condition, returns the original input with an error
  • Each processor gets an isolated copy via Clone()
  • Condition is evaluated as results arrive (no waiting for all)
  • Can reduce latency while ensuring quality constraints

Example:

// Find the first shipping rate under $50
contest := pipz.NewContest("cheapest-rate",
    func(_ context.Context, rate Rate) bool {
        return rate.Cost < 50.00
    },
    fedexRates,
    upsRates,
    uspsRates,
)

func NewContest

func NewContest[T Cloner[T]](identity Identity, condition func(context.Context, T) bool, processors ...Chainable[T]) *Contest[T]

NewContest creates a new Contest connector with the specified winning condition. The condition function determines which results are acceptable winners. A result must both complete successfully AND meet the condition to win.

func (*Contest[T]) Add

func (c *Contest[T]) Add(processor Chainable[T]) *Contest[T]

Add appends a processor to the contest execution list.

func (*Contest[T]) Clear

func (c *Contest[T]) Clear() *Contest[T]

Clear removes all processors from the contest execution list.

func (*Contest[T]) Close

func (c *Contest[T]) Close() error

Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.

func (*Contest[T]) Identity

func (c *Contest[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Contest[T]) Len

func (c *Contest[T]) Len() int

Len returns the number of processors.

func (*Contest[T]) Process

func (c *Contest[T]) Process(ctx context.Context, input T) (result T, err error)

Process implements the Chainable interface.

func (*Contest[T]) Remove

func (c *Contest[T]) Remove(index int) error

Remove removes the processor at the specified index.

func (*Contest[T]) Schema

func (c *Contest[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Contest[T]) SetCondition

func (c *Contest[T]) SetCondition(condition func(context.Context, T) bool) *Contest[T]

SetCondition updates the winning condition. This allows changing the criteria at runtime.

func (*Contest[T]) SetProcessors

func (c *Contest[T]) SetProcessors(processors ...Chainable[T]) *Contest[T]

SetProcessors replaces all processors atomically.

type ContestFlow

type ContestFlow struct {
	Competitors []Node `json:"competitors"`
}

ContestFlow represents parallel execution with result selection. All competitors complete; a selector chooses the best result.

func (ContestFlow) Variant

func (ContestFlow) Variant() FlowVariant

Variant implements Flow.

type Error

type Error[T any] struct {
	Timestamp time.Time
	InputData T
	Err       error
	Path      []Identity
	Duration  time.Duration
	Timeout   bool
	Canceled  bool
}

Error provides rich context about pipeline execution failures. It wraps the underlying error with information about where and when the failure occurred, what data was being processed, and the complete path through the processing chain.

The Path field contains Identity values, enabling correlation between error paths and schema definitions via the Identity.ID() UUIDs.

func (*Error[T]) Error

func (e *Error[T]) Error() string

Error implements the error interface, providing a detailed error message.

func (*Error[T]) IsCanceled

func (e *Error[T]) IsCanceled() bool

IsCanceled returns true if the error was caused by cancellation. This typically indicates intentional termination rather than failure, useful for distinguishing between errors that should trigger alerts versus expected shutdowns.

func (*Error[T]) IsTimeout

func (e *Error[T]) IsTimeout() bool

IsTimeout returns true if the error was caused by a timeout. This includes both explicit timeout from the Timeout connector and context deadline exceeded. Useful for implementing timeout-specific retry logic or monitoring.

func (*Error[T]) Unwrap

func (e *Error[T]) Unwrap() error

Unwrap returns the underlying error, supporting error wrapping patterns. This allows use of errors.Is and errors.As with the underlying error, maintaining compatibility with Go's standard error handling patterns.

type Fallback

type Fallback[T any] struct {
	// contains filtered or unexported fields
}

Fallback attempts processors in order, falling back to the next on error. Fallback provides automatic failover through a chain of alternative processors when earlier ones fail. This creates resilient processing chains that can recover from failures gracefully.

Unlike Retry which attempts the same operation multiple times, Fallback switches to completely different implementations. Each processor is tried in order until one succeeds or all fail.

Common use cases:

  • Primary/backup/tertiary service failover
  • Graceful degradation strategies
  • Multiple payment provider support
  • Cache miss handling (try local cache, then redis, then database)
  • API version compatibility chains

Example:

fallback := pipz.NewFallback("payment-providers",
    stripeProcessor,       // Try Stripe first
    paypalProcessor,       // Fall back to PayPal on error
    squareProcessor,       // Finally try Square
)

IMPORTANT: Avoid circular references between Fallback instances when all processors fail. Example of DANGEROUS pattern:

fallback1 → fallback2 → fallback3 → fallback1

This creates infinite recursion risk if all processors fail, leading to stack overflow.

func NewFallback

func NewFallback[T any](identity Identity, processors ...Chainable[T]) *Fallback[T]

NewFallback creates a new Fallback connector that tries processors in order. Each processor is tried in order until one succeeds or all fail. If no processors are provided, Process() will return an error.

Examples:

fallback := pipz.NewFallback(PaymentID, stripe, paypal, square)
fallback := pipz.NewFallback(CacheID, redis, database)

func (*Fallback[T]) AddFallback

func (f *Fallback[T]) AddFallback(processor Chainable[T]) *Fallback[T]

AddFallback appends a processor to the end of the fallback chain.

func (*Fallback[T]) Close

func (f *Fallback[T]) Close() error

Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.

func (*Fallback[T]) GetFallback

func (f *Fallback[T]) GetFallback() Chainable[T]

GetFallback returns the second processor (for backward compatibility). Returns nil if there's no second processor.

func (*Fallback[T]) GetPrimary

func (f *Fallback[T]) GetPrimary() Chainable[T]

GetPrimary returns the first processor (for backward compatibility).

func (*Fallback[T]) GetProcessors

func (f *Fallback[T]) GetProcessors() []Chainable[T]

GetProcessors returns a copy of all processors in order.

func (*Fallback[T]) Identity

func (f *Fallback[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Fallback[T]) InsertAt

func (f *Fallback[T]) InsertAt(index int, processor Chainable[T]) error

InsertAt inserts a processor at the specified index.

func (*Fallback[T]) Len

func (f *Fallback[T]) Len() int

Len returns the number of processors in the fallback chain.

func (*Fallback[T]) Process

func (f *Fallback[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface. Tries each processor in order until one succeeds or all fail.

func (*Fallback[T]) RemoveAt

func (f *Fallback[T]) RemoveAt(index int) error

RemoveAt removes the processor at the specified index.

func (*Fallback[T]) Schema

func (f *Fallback[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Fallback[T]) SetFallback

func (f *Fallback[T]) SetFallback(processor Chainable[T]) *Fallback[T]

SetFallback updates the second processor (for backward compatibility). If there's no second processor, adds one.

func (*Fallback[T]) SetPrimary

func (f *Fallback[T]) SetPrimary(processor Chainable[T]) *Fallback[T]

SetPrimary updates the first processor (for backward compatibility).

func (*Fallback[T]) SetProcessors

func (f *Fallback[T]) SetProcessors(processors ...Chainable[T]) *Fallback[T]

SetProcessors replaces all processors with the provided ones. If no processors are provided, Process() will return an error.

type FallbackFlow

type FallbackFlow struct {
	Primary Node   `json:"primary"`
	Backups []Node `json:"backups"`
}

FallbackFlow represents primary/backup processing. The primary is tried first; if it fails, backups are tried in order.

func (FallbackFlow) Variant

func (FallbackFlow) Variant() FlowVariant

Variant implements Flow.

type Filter

type Filter[T any] struct {
	// contains filtered or unexported fields
}

Filter creates a conditional processor that either continues the pipeline unchanged or executes a processor based on a predicate function.

Filter provides a clean way to implement conditional processing without complex if-else logic scattered throughout your code. When the condition returns true, the processor is executed. When false, data passes through unchanged with no errors.

This is ideal for:

  • Feature flags (process only for enabled users)
  • A/B testing (apply changes to test group)
  • Optional processing steps based on data state
  • Business rules that apply to subset of data
  • Conditional enrichment or validation
  • Performance optimizations (skip expensive operations)

Unlike Switch which routes to different processors, Filter either processes or passes through. Unlike Mutate which only supports transformations that cannot fail, Filter can execute any Chainable including ones that may error.

Example - Feature flag processing:

enableNewFeature := pipz.NewFilter("feature-flag",
    func(ctx context.Context, user User) bool {
        return user.BetaEnabled && isFeatureEnabled(ctx, "new-algorithm")
    },
    newAlgorithmProcessor,
)

Example - Conditional validation:

validatePremium := pipz.NewFilter("premium-validation",
    func(ctx context.Context, order Order) bool {
        return order.CustomerTier == "premium"
    },
    pipz.NewSequence("premium-checks",
        validateCreditLimit,
        checkFraudScore,
        verifyIdentity,
    ),
)

The Filter connector is thread-safe and can be safely used in concurrent scenarios. The condition function and processor can be updated at runtime for dynamic behavior.

func NewFilter

func NewFilter[T any](identity Identity, condition func(context.Context, T) bool, processor Chainable[T]) *Filter[T]

NewFilter creates a new Filter connector with the given condition and processor. When condition returns true, processor is executed. When false, data passes through unchanged.

func (*Filter[T]) Close

func (f *Filter[T]) Close() error

Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.

func (*Filter[T]) Condition

func (f *Filter[T]) Condition() func(context.Context, T) bool

Condition returns a copy of the current condition function. Note: This returns the function reference, not a deep copy.

func (*Filter[T]) Identity

func (f *Filter[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Filter[T]) Process

func (f *Filter[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface. Evaluates the condition and either executes the processor or passes data through unchanged.

func (*Filter[T]) Processor

func (f *Filter[T]) Processor() Chainable[T]

Processor returns the current processor.

func (*Filter[T]) Schema

func (f *Filter[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Filter[T]) SetCondition

func (f *Filter[T]) SetCondition(condition func(context.Context, T) bool) *Filter[T]

SetCondition updates the condition function. This allows for dynamic behavior changes at runtime.

func (*Filter[T]) SetProcessor

func (f *Filter[T]) SetProcessor(processor Chainable[T]) *Filter[T]

SetProcessor updates the processor to execute when condition is true. This allows for dynamic processor changes at runtime.

type FilterFlow

type FilterFlow struct {
	Processor Node `json:"processor"`
}

FilterFlow represents conditional processing. Items matching the predicate are processed; others pass through.

func (FilterFlow) Variant

func (FilterFlow) Variant() FlowVariant

Variant implements Flow.

type Flow

type Flow interface {
	// Variant returns the type discriminator for this flow.
	Variant() FlowVariant
}

Flow represents how children are organized within a connector node. Each connector type has its own Flow implementation that describes the semantic relationship between the connector and its children.

Leaf nodes (processors) have nil Flow since they have no children.

type FlowKey

type FlowKey[T Flow] struct {
	// contains filtered or unexported fields
}

FlowKey provides bidirectional type-safe extraction for Flow types. This follows the pattern established in capitan for type-safe field extraction.

Example:

if seq, ok := SequenceKey.From(node); ok {
    for _, step := range seq.Steps {
        // process each step
    }
}

func (FlowKey[T]) From

func (FlowKey[T]) From(node Node) (T, bool)

From extracts the typed Flow from a Node. Returns the flow and true if the node's flow matches this key's type, or zero value and false otherwise.

func (FlowKey[T]) Variant

func (k FlowKey[T]) Variant() FlowVariant

Variant returns the flow type this key extracts.

type FlowVariant

type FlowVariant string

FlowVariant is a discriminator for the Flow interface implementation type. Used for runtime type identification when type assertions are needed.

const (
	// Connectors (have children).
	FlowVariantSequence       FlowVariant = "sequence"
	FlowVariantFallback       FlowVariant = "fallback"
	FlowVariantRace           FlowVariant = "race"
	FlowVariantContest        FlowVariant = "contest"
	FlowVariantConcurrent     FlowVariant = "concurrent"
	FlowVariantSwitch         FlowVariant = "switch"
	FlowVariantFilter         FlowVariant = "filter"
	FlowVariantHandle         FlowVariant = "handle"
	FlowVariantScaffold       FlowVariant = "scaffold"
	FlowVariantBackoff        FlowVariant = "backoff"
	FlowVariantRetry          FlowVariant = "retry"
	FlowVariantTimeout        FlowVariant = "timeout"
	FlowVariantRateLimiter    FlowVariant = "ratelimiter"
	FlowVariantCircuitBreaker FlowVariant = "circuitbreaker"
	FlowVariantWorkerpool     FlowVariant = "workerpool"
	FlowVariantPipeline       FlowVariant = "pipeline"

	// Processors (leaf nodes).
	FlowVariantApply     FlowVariant = "apply"
	FlowVariantTransform FlowVariant = "transform"
	FlowVariantEffect    FlowVariant = "effect"
	FlowVariantEnrich    FlowVariant = "enrich"
	FlowVariantMutate    FlowVariant = "mutate"
)

Flow variants for all pipeline node types.

type Handle

type Handle[T any] struct {
	// contains filtered or unexported fields
}

Handle provides error observation and handling for processors. When the wrapped processor fails, Handle passes the error to an error handler for processing (e.g., logging, cleanup, notifications), then passes the original error through.

Common patterns:

  • Log errors with additional context
  • Clean up resources on failure (e.g., release inventory)
  • Send notifications or alerts
  • Collect metrics about failures
  • Implement compensation logic

The error handler receives a Chainable[*Error[T]] with full error context, including the input data, error details, and processing path.

Example:

// Log errors with context
logged := pipz.NewHandle(
    "with-logging",
    processOrder,
    pipz.Effect("log", func(ctx context.Context, err *Error[Order]) error {
        log.Printf("order %s failed: %v", err.InputData.ID, err.Err)
        return nil
    }),
)

// Clean up resources on failure
withCleanup := pipz.NewHandle(
    "inventory-cleanup",
    reserveAndCharge,
    pipz.Effect("release", func(ctx context.Context, err *Error[Order]) error {
        if err.InputData.ReservationID != "" {
            inventory.Release(err.InputData.ReservationID)
        }
        return nil
    }),
)

func NewHandle

func NewHandle[T any](identity Identity, processor Chainable[T], errorHandler Chainable[*Error[T]]) *Handle[T]

NewHandle creates a new Handle connector.

func (*Handle[T]) Close

func (h *Handle[T]) Close() error

Close gracefully shuts down the connector and its child processors. Close is idempotent - multiple calls return the same result.

func (*Handle[T]) GetErrorHandler

func (h *Handle[T]) GetErrorHandler() Chainable[*Error[T]]

GetErrorHandler returns the current error handler.

func (*Handle[T]) GetProcessor

func (h *Handle[T]) GetProcessor() Chainable[T]

GetProcessor returns the current main processor.

func (*Handle[T]) Identity

func (h *Handle[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Handle[T]) Process

func (h *Handle[T]) Process(ctx context.Context, input T) (result T, err error)

Process implements the Chainable interface.

func (*Handle[T]) Schema

func (h *Handle[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Handle[T]) SetErrorHandler

func (h *Handle[T]) SetErrorHandler(handler Chainable[*Error[T]]) *Handle[T]

SetErrorHandler updates the error handler.

func (*Handle[T]) SetProcessor

func (h *Handle[T]) SetProcessor(processor Chainable[T]) *Handle[T]

SetProcessor updates the main processor.

type HandleFlow

type HandleFlow struct {
	Processor    Node `json:"processor"`
	ErrorHandler Node `json:"error_handler"`
}

HandleFlow represents error observation and handling. The processor is wrapped; errors flow to the error handler.

func (HandleFlow) Variant

func (HandleFlow) Variant() FlowVariant

Variant implements Flow.

type Identity

type Identity struct {
	// contains filtered or unexported fields
}

Identity provides rich metadata for processors and connectors. It replaces the simple Name type with structured identity information that supports debugging, visualization, and profiling.

Each Identity has an auto-generated UUID that uniquely identifies the processor or connector instance, enabling correlation between schema definitions and runtime signal events.

Example:

var (
    ValidateOrderID = pipz.NewIdentity("validate-order", "Validates order structure")
    EnrichCustomerID = pipz.NewIdentity("enrich-customer", "Adds customer details from CRM")
)

pipeline := pipz.NewSequence(PipelineID,
    pipz.Apply(ValidateOrderID, validateOrder),
    pipz.Enrich(EnrichCustomerID, enrichCustomer),
)

func NewIdentity

func NewIdentity(name, description string) Identity

NewIdentity creates a new Identity with an auto-generated UUID. The name should be a short, descriptive identifier (e.g., "validate-order"). The description provides additional context for debugging and documentation.

func (Identity) Description

func (i Identity) Description() string

Description returns the optional description.

func (Identity) ID

func (i Identity) ID() uuid.UUID

ID returns the unique identifier for this processor or connector.

func (Identity) Name

func (i Identity) Name() string

Name returns the human-readable name.

func (Identity) String

func (i Identity) String() string

String implements fmt.Stringer, returning the name for convenient logging.

type Node

type Node struct {
	Identity Identity       `json:"-"`
	Type     string         `json:"type"`
	Flow     Flow           `json:"flow,omitempty"`
	Metadata map[string]any `json:"metadata,omitempty"`
}

Node represents a node in the pipeline schema tree. It provides a serializable representation of the pipeline structure for visualization, debugging, and tooling.

The schema can be generated at build time via the Schema() method on any Chainable, providing a complete picture of the pipeline structure without executing it.

For connector nodes (Sequence, Fallback, etc.), the Flow field contains semantic child relationships. For processor nodes (Apply, Transform, etc.), Flow is nil since they are leaf nodes.

Example:

pipeline := pipz.NewSequence(PipelineID,
    pipz.Apply(ValidateID, validate),
    pipz.NewFallback(FallbackID,
        pipz.Apply(PrimaryID, primary),
        pipz.Apply(BackupID, backup),
    ),
)

schema := pipeline.Schema()
jsonBytes, _ := json.MarshalIndent(schema, "", "  ")
fmt.Println(string(jsonBytes))

func (Node) MarshalJSON

func (n Node) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler. It flattens the Identity into separate id, name, and description fields.

func (*Node) UnmarshalJSON

func (n *Node) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler. Note: The Identity UUID will be regenerated on unmarshal since the original UUID cannot be reconstructed from the JSON. Note: Flow unmarshaling is not supported - schemas are built from pipelines, not JSON.

type Pipeline

type Pipeline[T any] struct {
	// contains filtered or unexported fields
}

Pipeline wraps a Chainable to define a semantic execution context. It provides correlation between runtime signals and pipeline identity by injecting execution and pipeline IDs into the context.

Each Process() call generates a unique execution ID, while the pipeline ID remains stable (derived from the Pipeline's identity). Nested connectors can extract these IDs to include in signals, enabling distributed tracing and observability.

Example:

// Define a semantic pipeline
orderPipeline := pipz.NewPipeline(
    pipz.NewIdentity("order-processing", "Main order flow"),
    pipz.NewSequence(internalID, validate, enrich, save),
)

// All signals from nested connectors get correlation IDs
result, err := orderPipeline.Process(ctx, order)

// In signal handlers, extract IDs for correlation
if execID, ok := pipz.ExecutionIDFromContext(ctx); ok {
    log.Printf("Execution %s completed", execID)
}

func NewPipeline

func NewPipeline[T any](identity Identity, root Chainable[T]) *Pipeline[T]

NewPipeline creates a Pipeline that wraps a Chainable with execution context. The identity defines the semantic pipeline name for correlation purposes, separate from the identities of the components within.

func (*Pipeline[T]) Close

func (p *Pipeline[T]) Close() error

Close gracefully shuts down the wrapped Chainable.

func (*Pipeline[T]) Identity

func (p *Pipeline[T]) Identity() Identity

Identity returns the pipeline's identity.

func (*Pipeline[T]) Process

func (p *Pipeline[T]) Process(ctx context.Context, data T) (T, error)

Process executes the wrapped Chainable with execution context. Each call generates a unique execution ID and injects both the execution ID and pipeline ID into the context before delegating to the root Chainable.

func (*Pipeline[T]) Schema

func (p *Pipeline[T]) Schema() Node

Schema returns a Node representing this pipeline in the schema. The pipeline wraps the root's schema as a child.

type PipelineFlow

type PipelineFlow struct {
	Root Node `json:"root"`
}

PipelineFlow represents a semantic execution context wrapper. It wraps a root chainable with execution tracking metadata.

func (PipelineFlow) Variant

func (PipelineFlow) Variant() FlowVariant

Variant implements Flow.

type Processor

type Processor[T any] struct {
	// contains filtered or unexported fields
}

Processor defines an identified processing stage that transforms a value of type T. It contains an Identity for debugging and visualization, and a private function that processes the value. The function receives a context for cancellation and timeout control.

Processor is the basic building block created by adapter functions like Apply, Transform, Effect, Mutate, and Enrich. The identity field is crucial for debugging, appearing in error messages and the Error[T].Path to identify exactly where failures occur. The identity's UUID enables correlation between schema definitions and runtime signal events.

The fn field is intentionally private to ensure processors are only created through the provided adapter functions, maintaining consistent error handling and path tracking.

Best practices for processor identities:

  • Use descriptive, action-oriented names ("validate-email", not "email")
  • Include the operation type ("parse-json", "fetch-user", "log-event")
  • Keep names concise but meaningful
  • Use descriptions to document the processor's purpose
  • Identities appear in Error[T].Path for debugging

func Apply

func Apply[T any](identity Identity, fn func(context.Context, T) (T, error)) Processor[T]

Apply creates a Processor from a function that transforms data and may return an error. Apply is the workhorse processor - use it when your transformation might fail due to validation, parsing, external API calls, or business rule violations.

The function receives a context for timeout/cancellation support. Long-running operations should check ctx.Err() periodically. On error, the pipeline stops immediately and returns the error wrapped with debugging context.

Apply is ideal for:

  • Data validation with transformation
  • API calls that return modified data
  • Database lookups that enhance data
  • Parsing operations that might fail
  • Business rule enforcement

For pure transformations that can't fail, use Transform for better performance. For operations that should continue on failure, use Enrich.

Example:

var ParseJSONID = pipz.NewIdentity("parse-json", "Parses JSON input into Data struct")
parseJSON := pipz.Apply(ParseJSONID, func(ctx context.Context, raw string) (Data, error) {
    var data Data
    if err := json.Unmarshal([]byte(raw), &data); err != nil {
        return Data{}, fmt.Errorf("invalid JSON: %w", err)
    }
    return data, nil
})

func Effect

func Effect[T any](identity Identity, fn func(context.Context, T) error) Processor[T]

Effect creates a Processor that performs side effects without modifying the data. Effect is for operations that need to happen alongside your main processing flow, such as logging, metrics collection, notifications, or audit trails.

The function receives the data for inspection but must not modify it. Any returned error stops the pipeline immediately. The original data always passes through unchanged, making Effect perfect for:

  • Logging important events or data states
  • Recording metrics (counts, latencies, values)
  • Sending notifications or alerts
  • Writing audit logs for compliance
  • Triggering external systems
  • Validating without transformation

Unlike Apply, Effect cannot transform data. Unlike Transform, it can fail. This separation ensures side effects are explicit and testable.

Example:

var AuditPaymentID = pipz.NewIdentity("audit-payment", "Logs payment to audit trail")
auditLog := pipz.Effect(AuditPaymentID, func(ctx context.Context, payment Payment) error {
    return auditLogger.Log(ctx, "payment_processed", map[string]any{
        "amount": payment.Amount,
        "user_id": payment.UserID,
        "timestamp": time.Now(),
    })
})

func Enrich

func Enrich[T any](identity Identity, fn func(context.Context, T) (T, error)) Processor[T]

Enrich creates a Processor that attempts to enhance data with additional information. Enrich is unique among processors - if the enrichment fails, it returns the original data unchanged rather than stopping the pipeline. This makes it ideal for optional enhancements that improve data quality but aren't critical for processing.

The enrichment function should fetch additional data and return an enhanced version. Common enrichment patterns include:

  • Adding user details from a cache or database
  • Geocoding addresses to add coordinates
  • Fetching current prices or exchange rates
  • Looking up metadata from external services
  • Adding computed fields from external data

Use Enrich when the additional data is "nice to have" but not required. If the enrichment is mandatory, use Apply instead. Enrich swallows errors to ensure pipeline continuity, so consider logging failures within the enrichment function.

Example:

var AddCustomerNameID = pipz.NewIdentity("add-customer-name", "Enriches order with customer name from CRM")
addCustomerName := pipz.Enrich(AddCustomerNameID, func(ctx context.Context, order Order) (Order, error) {
    customer, err := customerService.Get(ctx, order.CustomerID)
    if err != nil {
        // Log but don't fail - order processing continues without name
        log.Printf("failed to enrich customer data: %v", err)
        return order, err
    }
    order.CustomerName = customer.Name
    return order, nil
})

func Mutate

func Mutate[T any](identity Identity, transformer func(context.Context, T) T, condition func(context.Context, T) bool) Processor[T]

Mutate creates a Processor that conditionally transforms data based on a predicate. Mutate combines a condition check with a transformation, applying the transformer only when the condition returns true. When false, data passes through unchanged.

This pattern is cleaner than embedding if-statements in Transform functions and makes the condition explicit and testable. Use Mutate for:

  • Feature flags (transform only for enabled users)
  • A/B testing (apply changes to test group)
  • Conditional formatting based on data values
  • Environment-specific transformations
  • Business rules that apply to subset of data

The condition and transformer are separate functions for better testability and reusability. The transformer cannot fail - use Apply with conditional logic if you need error handling.

Example:

var PremiumDiscountID = pipz.NewIdentity("premium-discount", "Applies 10% discount for premium customers")
discountPremium := pipz.Mutate(PremiumDiscountID,
    func(ctx context.Context, order Order) Order {
        order.Total *= 0.9  // 10% discount
        return order
    },
    func(ctx context.Context, order Order) bool {
        return order.CustomerTier == "premium" && order.Total > 100
    },
)

func Transform

func Transform[T any](identity Identity, fn func(context.Context, T) T) Processor[T]

Transform creates a Processor that applies a pure transformation function to data. Transform is the simplest processor - use it when your operation always succeeds and always modifies the data in a predictable way.

The transformation function cannot fail, making Transform ideal for:

  • Data formatting (uppercase, trimming, parsing that can't fail)
  • Mathematical calculations that can't error
  • Field mapping or restructuring
  • Adding computed fields

If your transformation might fail (e.g., parsing, validation), use Apply instead. If you need conditional transformation, use Mutate.

Example:

var UppercaseID = pipz.NewIdentity("uppercase", "Converts text to uppercase")
uppercase := pipz.Transform(UppercaseID, func(ctx context.Context, s string) string {
    return strings.ToUpper(s)
})

func (Processor[T]) Close

func (Processor[T]) Close() error

Close gracefully shuts down any resources.

func (Processor[T]) Identity

func (p Processor[T]) Identity() Identity

Identity returns the identity of the processor for debugging and error reporting.

func (Processor[T]) Process

func (p Processor[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface, allowing individual processors to be used directly or composed in connectors.

This means a single Processor can be used anywhere a Chainable is expected:

validator := pipz.Effect(ValidateID, validateFunc)
// Can be used directly
result, err := validator.Process(ctx, data)
// Or in connectors
pipeline := pipz.NewSequence(PipelineID, validator, transformer)

func (Processor[T]) Schema

func (p Processor[T]) Schema() Node

Schema returns a Node representing this processor in the pipeline schema. Processors are leaf nodes with type "processor".

type Race

type Race[T Cloner[T]] struct {
	// contains filtered or unexported fields
}

Race runs all processors in parallel and returns the result of the first to complete successfully. Race implements competitive processing where speed matters more than which specific processor succeeds. The first successful result wins and cancels all other processors.

Context handling: Race uses context.WithCancel(ctx) to create a derived context that preserves all parent context values (including trace IDs) while allowing cancellation of losing processors when a winner is found.

The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.

This pattern excels when you have multiple ways to get the same result and want the fastest one:

  • Querying multiple replicas or regions
  • Trying different algorithms with varying performance
  • Fetching from multiple caches
  • Calling primary and backup services simultaneously
  • Any scenario where latency matters more than specific source

Key behaviors:

  • First success wins and cancels others
  • All failures returns the last error
  • Each processor gets an isolated copy via Clone()
  • Useful for reducing p99 latencies
  • Can increase load (all processors run)

Example:

// UserQuery must implement Cloner[UserQuery]
race := pipz.NewRace(
    fetchFromLocalCache,
    fetchFromRegionalCache,
    fetchFromDatabase,
)

func NewRace

func NewRace[T Cloner[T]](identity Identity, processors ...Chainable[T]) *Race[T]

NewRace creates a new Race connector.

func (*Race[T]) Add

func (r *Race[T]) Add(processor Chainable[T]) *Race[T]

Add appends a processor to the race execution list.

func (*Race[T]) Clear

func (r *Race[T]) Clear() *Race[T]

Clear removes all processors from the race execution list.

func (*Race[T]) Close

func (r *Race[T]) Close() error

Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.

func (*Race[T]) Identity

func (r *Race[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Race[T]) Len

func (r *Race[T]) Len() int

Len returns the number of processors.

func (*Race[T]) Process

func (r *Race[T]) Process(ctx context.Context, input T) (result T, err error)

Process implements the Chainable interface.

func (*Race[T]) Remove

func (r *Race[T]) Remove(index int) error

Remove removes the processor at the specified index.

func (*Race[T]) Schema

func (r *Race[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Race[T]) SetProcessors

func (r *Race[T]) SetProcessors(processors ...Chainable[T]) *Race[T]

SetProcessors replaces all processors atomically.

type RaceFlow

type RaceFlow struct {
	Competitors []Node `json:"competitors"`
}

RaceFlow represents parallel execution where first success wins. All competitors start simultaneously; first to succeed cancels others.

func (RaceFlow) Variant

func (RaceFlow) Variant() FlowVariant

Variant implements Flow.

type RateLimiter

type RateLimiter[T any] struct {
	// contains filtered or unexported fields
}

RateLimiter controls the rate of processing to protect downstream services. RateLimiter wraps a processor and uses a token bucket algorithm to enforce rate limits, allowing controlled bursts while maintaining a steady average rate. This is essential for protecting external APIs, databases, and other rate-sensitive resources.

CRITICAL: RateLimiter is a STATEFUL connector that maintains an internal token bucket. Create it once and reuse it - do NOT create a new RateLimiter for each request, as that would reset the token bucket and rate limiting would not work.

❌ WRONG - Creating per request (useless):

func handleRequest(req Request) Response {
    limiter := pipz.NewRateLimiter(LimiterID, 100, 10, apiCall)  // NEW limiter each time!
    return limiter.Process(ctx, req)                             // Always allows through
}

✅ RIGHT - Create once, reuse:

var apiLimiter = pipz.NewRateLimiter(LimiterID, 100, 10, apiCall)  // Shared instance

func handleRequest(req Request) Response {
    return apiLimiter.Process(ctx, req)  // Actually rate limits
}

The limiter operates in two modes:

  • "wait": Blocks until a token is available (default)
  • "drop": Returns an error immediately if no tokens available

RateLimiter is particularly useful for:

  • API client implementations with rate limits
  • Database connection throttling
  • Preventing overwhelming downstream services
  • Implementing fair resource sharing
  • Meeting SLA requirements

Best Practices:

  • Use const names for all processors/connectors (see best-practices.md)
  • Create RateLimiters once and reuse them (e.g., as struct fields or package variables)
  • Configure limits based on actual downstream capacity
  • Layer multiple limiters for complex scenarios (global → service → endpoint)

Example:

var (
    APILimiterID = pipz.NewIdentity("api-limiter", "Rate limits Stripe API calls")
    ChargeID     = pipz.NewIdentity("charge", "Process payment charge")
)

// Create limiter wrapping the API call
var stripeLimiter = pipz.NewRateLimiter(APILimiterID, 100, 10,
    pipz.Apply(ChargeID, processStripeCharge),
)

// Use in pipeline
func createPaymentPipeline() pipz.Chainable[Payment] {
    return pipz.NewSequence(PipelineID,
        validatePayment,
        stripeLimiter,  // Rate-limited API call
        confirmPayment,
    )
}

func NewRateLimiter

func NewRateLimiter[T any](identity Identity, ratePerSecond float64, burst int, processor Chainable[T]) *RateLimiter[T]

NewRateLimiter creates a new RateLimiter connector wrapping the given processor. The ratePerSecond parameter sets the sustained rate limit. The burst parameter sets the maximum burst size.

func (*RateLimiter[T]) Close

func (r *RateLimiter[T]) Close() error

Close gracefully shuts down the connector and its wrapped processor. Close is idempotent - multiple calls return the same result.

func (*RateLimiter[T]) GetAvailableTokens

func (r *RateLimiter[T]) GetAvailableTokens() float64

GetAvailableTokens returns the current number of available tokens. This method is primarily intended for testing and debugging.

func (*RateLimiter[T]) GetBurst

func (r *RateLimiter[T]) GetBurst() int

GetBurst returns the current burst capacity.

func (*RateLimiter[T]) GetMode

func (r *RateLimiter[T]) GetMode() string

GetMode returns the current mode ("wait" or "drop").

func (*RateLimiter[T]) GetRate

func (r *RateLimiter[T]) GetRate() float64

GetRate returns the current rate limit.

func (*RateLimiter[T]) Identity

func (r *RateLimiter[T]) Identity() Identity

Identity returns the identity of this connector.

func (*RateLimiter[T]) Process

func (r *RateLimiter[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface.

func (*RateLimiter[T]) Schema

func (r *RateLimiter[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*RateLimiter[T]) SetBurst

func (r *RateLimiter[T]) SetBurst(burst int) *RateLimiter[T]

SetBurst updates the burst capacity.

func (*RateLimiter[T]) SetMode

func (r *RateLimiter[T]) SetMode(mode string) *RateLimiter[T]

SetMode sets the rate limiting mode ("wait" or "drop").

func (*RateLimiter[T]) SetRate

func (r *RateLimiter[T]) SetRate(ratePerSecond float64) *RateLimiter[T]

SetRate updates the rate limit (requests per second).

func (*RateLimiter[T]) WithClock

func (r *RateLimiter[T]) WithClock(clock clockz.Clock) *RateLimiter[T]

WithClock sets the clock implementation for testing purposes. This method is primarily intended for testing with FakeClock.

type RateLimiterFlow

type RateLimiterFlow struct {
	Processor Node `json:"processor"`
}

RateLimiterFlow represents processing with rate limiting.

func (RateLimiterFlow) Variant

func (RateLimiterFlow) Variant() FlowVariant

Variant implements Flow.

type Retry

type Retry[T any] struct {
	// contains filtered or unexported fields
}

Retry attempts the processor up to maxAttempts times. Retry provides simple retry logic for operations that may fail transiently. It immediately retries on failure without delay, making it suitable for quick operations or when failures are expected to clear immediately.

Each retry uses the same input data. Context cancellation is checked between attempts to allow for early termination. If all attempts fail, the last error is returned with attempt count information for debugging.

Use Retry for:

  • Network calls with transient failures
  • Database operations during brief contentions
  • File operations with temporary locks
  • Any operation with intermittent failures

For operations needing delay between retries, use RetryWithBackoff. For trying different approaches, use Fallback instead.

Example:

var RetryID = pipz.NewIdentity("retry-db", "Retries database write")
retry := pipz.NewRetry(
    RetryID,
    databaseWriter,
    3,  // Try up to 3 times
)

func NewRetry

func NewRetry[T any](identity Identity, processor Chainable[T], maxAttempts int) *Retry[T]

NewRetry creates a new Retry connector.

func (*Retry[T]) Close

func (r *Retry[T]) Close() error

Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.

func (*Retry[T]) GetMaxAttempts

func (r *Retry[T]) GetMaxAttempts() int

GetMaxAttempts returns the current maximum attempts setting.

func (*Retry[T]) Identity

func (r *Retry[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Retry[T]) Process

func (r *Retry[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface.

func (*Retry[T]) Schema

func (r *Retry[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Retry[T]) SetMaxAttempts

func (r *Retry[T]) SetMaxAttempts(n int) *Retry[T]

SetMaxAttempts updates the maximum number of retry attempts.

type RetryFlow

type RetryFlow struct {
	Processor Node `json:"processor"`
}

RetryFlow represents processing with simple retry logic.

func (RetryFlow) Variant

func (RetryFlow) Variant() FlowVariant

Variant implements Flow.

type Scaffold

type Scaffold[T Cloner[T]] struct {
	// contains filtered or unexported fields
}

Scaffold runs all processors in parallel with context isolation for true fire-and-forget behavior. Unlike Concurrent, Scaffold uses context.WithoutCancel to ensure processors continue running even if the parent context is canceled. This is ideal for operations that must complete regardless of the main pipeline's state.

The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.

Use Scaffold when you need:

  • True fire-and-forget operations that outlive the request
  • Background tasks that shouldn't be canceled with the main flow
  • Cleanup or logging operations that must complete
  • Non-critical side effects that shouldn't block the pipeline

Common use cases:

  • Asynchronous audit logging
  • Background cache warming
  • Non-critical notifications
  • Metrics collection
  • Cleanup tasks that should complete independently

Important characteristics:

  • Input type must implement Cloner[T] interface
  • Processors continue even after parent context cancellation
  • Returns immediately without waiting for completion
  • Original input always returned unchanged
  • No error reporting from background processors
  • Trace context is preserved (but cancellation is not)

Example:

scaffold := pipz.NewScaffold(
    "async-operations",
    asyncAuditLog,
    warmCache,
    collectMetrics,
)

// Returns immediately, processors run in background
result, err := scaffold.Process(ctx, order)

func NewScaffold

func NewScaffold[T Cloner[T]](identity Identity, processors ...Chainable[T]) *Scaffold[T]

NewScaffold creates a new Scaffold connector.

func (*Scaffold[T]) Add

func (s *Scaffold[T]) Add(processor Chainable[T]) *Scaffold[T]

Add appends a processor to the scaffold execution list.

func (*Scaffold[T]) Clear

func (s *Scaffold[T]) Clear() *Scaffold[T]

Clear removes all processors from the scaffold execution list.

func (*Scaffold[T]) Close

func (s *Scaffold[T]) Close() error

Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.

func (*Scaffold[T]) Identity

func (s *Scaffold[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Scaffold[T]) Len

func (s *Scaffold[T]) Len() int

Len returns the number of processors.

func (*Scaffold[T]) Process

func (s *Scaffold[T]) Process(ctx context.Context, input T) (result T, err error)

Process implements the Chainable interface.

func (*Scaffold[T]) Remove

func (s *Scaffold[T]) Remove(index int) error

Remove removes the processor at the specified index.

func (*Scaffold[T]) Schema

func (s *Scaffold[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Scaffold[T]) SetProcessors

func (s *Scaffold[T]) SetProcessors(processors ...Chainable[T]) *Scaffold[T]

SetProcessors replaces all processors atomically.

type ScaffoldFlow

type ScaffoldFlow struct {
	Processors []Node `json:"processors"`
}

ScaffoldFlow represents fire-and-forget parallel execution. Processors run asynchronously without blocking the main flow.

func (ScaffoldFlow) Variant

func (ScaffoldFlow) Variant() FlowVariant

Variant implements Flow.

type Schema

type Schema struct {
	Root Node `json:"root"`
}

Schema represents a complete pipeline schema. It wraps the root Node and provides utilities for traversal and serialization.

func NewSchema

func NewSchema(root Node) Schema

NewSchema creates a Schema from a pipeline's root node.

func (Schema) Count

func (s Schema) Count() int

Count returns the total number of nodes in the schema.

func (Schema) Find

func (s Schema) Find(predicate func(Node) bool) *Node

Find returns the first node matching the predicate, or nil if not found.

func (Schema) FindByName

func (s Schema) FindByName(name string) *Node

FindByName returns the first node with the given name, or nil if not found.

func (Schema) FindByType

func (s Schema) FindByType(nodeType string) []Node

FindByType returns all nodes of the given type.

func (Schema) Walk

func (s Schema) Walk(fn func(Node))

Walk traverses the schema tree, calling fn for each node. Traversal is depth-first, pre-order.

type Sequence

type Sequence[T any] struct {
	// contains filtered or unexported fields
}

Sequence provides a type-safe sequence for processing values of type T. It maintains an ordered list of processors that are executed sequentially.

Sequence offers a rich API with methods to dynamically modify the processor chain. This makes it ideal for scenarios where the processing steps need to be configured at runtime or modified based on conditions.

Key features:

  • Thread-safe for concurrent access
  • Dynamic modification of processor chain
  • Identity-based processors for debugging and visualization
  • Rich API for reordering and modification
  • Fail-fast execution with detailed errors

Sequence is the primary way to chain processors together.

func NewSequence

func NewSequence[T any](identity Identity, processors ...Chainable[T]) *Sequence[T]

NewSequence creates a new Sequence with optional initial processors. The sequence is ready to use immediately and can be safely accessed concurrently. Additional processors can be added using Register or the various modification methods.

Example:

// Single line declaration
var (
    UserProcessingID = pipz.NewIdentity("user-processing", "Main user processing pipeline")
    ValidateID = pipz.NewIdentity("validate", "Validates user input")
    EnrichID = pipz.NewIdentity("enrich", "Enriches user with external data")
    AuditID = pipz.NewIdentity("audit", "Logs user actions for audit")
)
sequence := pipz.NewSequence(UserProcessingID,
    pipz.Effect(ValidateID, validateUser),
    pipz.Apply(EnrichID, enrichUser),
    pipz.Effect(AuditID, auditUser),
)

// Or create empty and add later
sequence := pipz.NewSequence[User](UserProcessingID)
sequence.Register(validateUser, enrichUser)

func (*Sequence[T]) After

func (c *Sequence[T]) After(afterID Identity, processors ...Chainable[T]) error

After inserts processors after the first processor with the specified identity.

func (*Sequence[T]) Before

func (c *Sequence[T]) Before(beforeID Identity, processors ...Chainable[T]) error

Before inserts processors before the first processor with the specified identity.

func (*Sequence[T]) Clear

func (c *Sequence[T]) Clear()

Clear removes all processors from the Sequence.

func (*Sequence[T]) Close

func (c *Sequence[T]) Close() error

Close gracefully shuts down the connector and all its child processors. Processors are closed in reverse order (LIFO) to mirror typical resource cleanup patterns. Close is idempotent - multiple calls return the same result.

func (*Sequence[T]) Identity

func (c *Sequence[T]) Identity() Identity

Identity returns the identity of this sequence.

func (*Sequence[T]) Len

func (c *Sequence[T]) Len() int

Len returns the number of processors in the Sequence.

func (*Sequence[T]) Names

func (c *Sequence[T]) Names() []string

Names returns the names of all processors in order.

func (*Sequence[T]) Pop

func (c *Sequence[T]) Pop() (Chainable[T], error)

Pop removes and returns the last processor.

func (*Sequence[T]) Process

func (c *Sequence[T]) Process(ctx context.Context, value T) (result T, err error)

Process executes all registered processors on the input value. Each processor receives the output of the previous processor. The context is checked before each processor execution - if the context is canceled or expired, processing stops immediately. If any processor returns an error, execution stops and a Error is returned with rich debugging information.

Process is thread-safe and can be called concurrently. The sequence's processor list is locked during execution to prevent modifications.

Error handling includes:

  • Processor name and stage index for debugging
  • Original input data that caused the failure
  • Execution duration for performance analysis
  • Timeout/cancellation detection

Context best practices:

  • Always use context with timeout for production
  • Check ctx.Err() in long-running processors
  • Pass context through to external calls

func (*Sequence[T]) Push

func (c *Sequence[T]) Push(processors ...Chainable[T])

Push adds processors to the back of the Sequence (runs last).

func (*Sequence[T]) Register

func (c *Sequence[T]) Register(processors ...Chainable[T])

Register adds processors to this Sequence. Processors are executed in the order they are registered.

This method is thread-safe and can be called concurrently. New processors are appended to the existing chain, making Register ideal for building sequences incrementally:

sequence := pipz.NewSequence[Order]("order-processing")
sequence.Register(validateOrder)
sequence.Register(calculateTax, applyDiscount)
if config.RequiresApproval {
    sequence.Register(requireApproval)
}

func (*Sequence[T]) Remove

func (c *Sequence[T]) Remove(id Identity) error

Remove removes the first processor with the specified identity.

func (*Sequence[T]) Replace

func (c *Sequence[T]) Replace(id Identity, processor Chainable[T]) error

Replace replaces the first processor with the specified identity.

func (*Sequence[T]) Schema

func (c *Sequence[T]) Schema() Node

Schema returns a Node representing this sequence in the pipeline schema.

func (*Sequence[T]) Shift

func (c *Sequence[T]) Shift() (Chainable[T], error)

Shift removes and returns the first processor.

func (*Sequence[T]) Unshift

func (c *Sequence[T]) Unshift(processors ...Chainable[T])

Unshift adds processors to the front of the Sequence (runs first).

type SequenceFlow

type SequenceFlow struct {
	Steps []Node `json:"steps"`
}

SequenceFlow represents an ordered sequence of processing steps. Each step is executed in order, with the output of each step becoming the input of the next.

func (SequenceFlow) Variant

func (SequenceFlow) Variant() FlowVariant

Variant implements Flow.

type Switch

type Switch[T any] struct {
	// contains filtered or unexported fields
}

Switch routes to different processors based on condition result. Switch enables conditional processing where the path taken depends on the input data. The condition function examines the data and returns a route key that determines which processor to use.

If no route exists for the returned key, the input passes through unchanged.

Switch is perfect for:

  • Status-based workflows with defined states
  • Region-specific logic
  • Priority handling
  • A/B testing with experiment names
  • Dynamic routing tables that change at runtime
  • Feature flag controlled processing paths

Example:

const (
    RouteStandard   = "standard"
    RouteHighValue  = "high_value"
    RouteCrypto     = "crypto"
)

router := pipz.NewSwitch(SwitchID,
    func(ctx context.Context, p Payment) string {
        if p.Amount > 10000 {
            return RouteHighValue
        } else if p.Method == "crypto" {
            return RouteCrypto
        }
        return RouteStandard
    },
)
router.AddRoute(RouteStandard, standardProcessor)
router.AddRoute(RouteHighValue, highValueProcessor)
router.AddRoute(RouteCrypto, cryptoProcessor)

func NewSwitch

func NewSwitch[T any](identity Identity, condition Condition[T]) *Switch[T]

NewSwitch creates a new Switch connector with the given condition function.

func (*Switch[T]) AddRoute

func (s *Switch[T]) AddRoute(key string, processor Chainable[T]) *Switch[T]

AddRoute adds or updates a route in the switch.

func (*Switch[T]) ClearRoutes

func (s *Switch[T]) ClearRoutes() *Switch[T]

ClearRoutes removes all routes from the switch.

func (*Switch[T]) Close

func (s *Switch[T]) Close() error

Close gracefully shuts down the connector and all its route processors. Close is idempotent - multiple calls return the same result.

func (*Switch[T]) HasRoute

func (s *Switch[T]) HasRoute(key string) bool

HasRoute checks if a route exists for the given key.

func (*Switch[T]) Identity

func (s *Switch[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Switch[T]) Process

func (s *Switch[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface. If no route matches the condition result, the input is returned unchanged.

func (*Switch[T]) RemoveRoute

func (s *Switch[T]) RemoveRoute(key string) *Switch[T]

RemoveRoute removes a route from the switch.

func (*Switch[T]) Routes

func (s *Switch[T]) Routes() map[string]Chainable[T]

Routes returns a copy of the current routes map.

func (*Switch[T]) Schema

func (s *Switch[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Switch[T]) SetCondition

func (s *Switch[T]) SetCondition(condition Condition[T]) *Switch[T]

SetCondition updates the condition function.

func (*Switch[T]) SetRoutes

func (s *Switch[T]) SetRoutes(routes map[string]Chainable[T]) *Switch[T]

SetRoutes replaces all routes in the switch atomically.

type SwitchFlow

type SwitchFlow struct {
	Routes map[string]Node `json:"routes"`
}

SwitchFlow represents conditional routing to different processors. The condition determines which route key to use.

func (SwitchFlow) Variant

func (SwitchFlow) Variant() FlowVariant

Variant implements Flow.

type Timeout

type Timeout[T any] struct {
	// contains filtered or unexported fields
}

Timeout enforces a timeout on the processor's execution. Timeout wraps any processor with a hard time limit, ensuring operations complete within acceptable bounds. If the timeout expires, the operation is canceled via context and a timeout error is returned.

This connector is critical for:

  • Preventing hung operations
  • Meeting SLA requirements
  • Protecting against slow external services
  • Ensuring predictable system behavior
  • Resource management in concurrent systems

The wrapped operation should respect context cancellation for immediate termination. Operations that ignore context may continue running in the background even after timeout.

Timeout is often combined with Retry for robust error handling:

pipz.NewRetry(RetryID, pipz.NewTimeout(TimeoutID, operation, 5*time.Second), 3)

Example:

var TimeoutID = pipz.NewIdentity("user-service-timeout", "Enforces 2s timeout")
timeout := pipz.NewTimeout(
    TimeoutID,
    userServiceCall,
    2*time.Second,  // Must complete within 2 seconds
)

func NewTimeout

func NewTimeout[T any](identity Identity, processor Chainable[T], duration time.Duration) *Timeout[T]

NewTimeout creates a new Timeout connector.

func (*Timeout[T]) Close

func (t *Timeout[T]) Close() error

Close gracefully shuts down the timeout connector and its child processor. Close is idempotent - multiple calls return the same result.

func (*Timeout[T]) GetDuration

func (t *Timeout[T]) GetDuration() time.Duration

GetDuration returns the current timeout duration.

func (*Timeout[T]) Identity

func (t *Timeout[T]) Identity() Identity

Identity returns the identity of this connector.

func (*Timeout[T]) Process

func (t *Timeout[T]) Process(ctx context.Context, data T) (result T, err error)

Process implements the Chainable interface.

func (*Timeout[T]) Schema

func (t *Timeout[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*Timeout[T]) SetDuration

func (t *Timeout[T]) SetDuration(d time.Duration) *Timeout[T]

SetDuration updates the timeout duration.

func (*Timeout[T]) WithClock

func (t *Timeout[T]) WithClock(clock clockz.Clock) *Timeout[T]

WithClock sets a custom clock for testing.

type TimeoutFlow

type TimeoutFlow struct {
	Processor Node `json:"processor"`
}

TimeoutFlow represents processing with a timeout constraint.

func (TimeoutFlow) Variant

func (TimeoutFlow) Variant() FlowVariant

Variant implements Flow.

type WorkerPool

type WorkerPool[T Cloner[T]] struct {
	// contains filtered or unexported fields
}

WorkerPool provides bounded parallel execution with a fixed number of workers. Uses semaphore pattern to limit concurrent processor execution while maintaining the same API and behavior as other connectors in the pipz ecosystem.

The input type T must implement Cloner[T] to provide safe concurrent processing. Each processor receives an isolated copy of the input data.

Example:

pool := pipz.NewWorkerPool("api-calls", 5,
    pipz.Apply("service-a", callServiceA),
    pipz.Apply("service-b", callServiceB),
    pipz.Apply("service-c", callServiceC),
)

func NewWorkerPool

func NewWorkerPool[T Cloner[T]](identity Identity, workers int, processors ...Chainable[T]) *WorkerPool[T]

NewWorkerPool creates a WorkerPool with specified worker count. Workers parameter controls maximum concurrent processors (semaphore slots).

func (*WorkerPool[T]) Add

func (w *WorkerPool[T]) Add(processor Chainable[T]) *WorkerPool[T]

Add appends a processor to the worker pool execution list.

func (*WorkerPool[T]) Clear

func (w *WorkerPool[T]) Clear() *WorkerPool[T]

Clear removes all processors from the worker pool execution list.

func (*WorkerPool[T]) Close

func (w *WorkerPool[T]) Close() error

Close gracefully shuts down the worker pool and all its child processors. Close is idempotent - multiple calls return the same result.

func (*WorkerPool[T]) GetActiveWorkers

func (w *WorkerPool[T]) GetActiveWorkers() int

GetActiveWorkers returns the number of currently active workers.

func (*WorkerPool[T]) GetWorkerCount

func (w *WorkerPool[T]) GetWorkerCount() int

GetWorkerCount returns the maximum number of concurrent workers.

func (*WorkerPool[T]) Identity

func (w *WorkerPool[T]) Identity() Identity

Identity returns the identity of this connector.

func (*WorkerPool[T]) Len

func (w *WorkerPool[T]) Len() int

Len returns the number of processors.

func (*WorkerPool[T]) Process

func (w *WorkerPool[T]) Process(ctx context.Context, input T) (result T, err error)

Process implements the Chainable interface.

func (*WorkerPool[T]) Remove

func (w *WorkerPool[T]) Remove(index int) error

Remove removes the processor at the specified index.

func (*WorkerPool[T]) Schema

func (w *WorkerPool[T]) Schema() Node

Schema returns a Node representing this connector in the pipeline schema.

func (*WorkerPool[T]) SetProcessors

func (w *WorkerPool[T]) SetProcessors(processors ...Chainable[T]) *WorkerPool[T]

SetProcessors replaces all processors atomically.

func (*WorkerPool[T]) SetWorkerCount

func (w *WorkerPool[T]) SetWorkerCount(workers int) *WorkerPool[T]

SetWorkerCount adjusts the worker pool size by recreating the semaphore.

func (*WorkerPool[T]) WithClock

func (w *WorkerPool[T]) WithClock(clock clockz.Clock) *WorkerPool[T]

WithClock sets a custom clock for testing.

func (*WorkerPool[T]) WithTimeout

func (w *WorkerPool[T]) WithTimeout(timeout time.Duration) *WorkerPool[T]

WithTimeout sets per-task timeout. Each processor must complete within this duration.

type WorkerpoolFlow

type WorkerpoolFlow struct {
	Processors []Node `json:"processors"`
}

WorkerpoolFlow represents processing distributed across a worker pool.

func (WorkerpoolFlow) Variant

func (WorkerpoolFlow) Variant() FlowVariant

Variant implements Flow.

Directories

Path Synopsis
Package testing provides test utilities and helpers for pipz-based applications.
Package testing provides test utilities and helpers for pipz-based applications.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL