Documentation
¶
Overview ¶
Package pipz provides a lightweight, type-safe library for building composable data processing pipelines in Go.
Overview ¶
pipz enables developers to create clean, testable, and maintainable data processing workflows by composing small, focused functions into larger pipelines. It addresses common challenges in Go applications such as scattered business logic, repetitive error handling, and difficult-to-test code that mixes pure logic with external dependencies.
Installation ¶
go get github.com/zoobz-io/pipz
Requires Go 1.24+ for generic type constraints.
Core Concepts ¶
The library is built around a single, uniform interface:
type Chainable[T any] interface {
Process(context.Context, T) (T, error)
Identity() Identity
Schema() Node
Close() error
}
Key components:
- Processors: Individual processing steps created with adapter functions (Transform, Apply, etc.)
- Connectors: Compose multiple processors into complex flows (Sequence, Switch, Concurrent, etc.)
- Sequence: The primary way to build sequential pipelines with runtime modification support
Design philosophy:
- Processors are immutable values (simple functions wrapped with metadata)
- Connectors are mutable pointers (configurable containers with state)
Everything implements Chainable[T], enabling seamless composition while maintaining type safety through Go generics. Context support provides timeout control and cancellation. Execution follows a fail-fast pattern where processing stops at the first error.
Adapter Functions ¶
Adapters wrap your functions to implement the Chainable interface:
Transform - Pure transformations that cannot fail:
var DoubleName = pipz.NewIdentity("double", "")
double := pipz.Transform(DoubleName, func(_ context.Context, n int) int {
return n * 2
})
Apply - Operations that can fail:
var ParseJSONName = pipz.NewIdentity("parse", "")
parseJSON := pipz.Apply(ParseJSONName, func(_ context.Context, s string) (Data, error) {
var d Data
return d, json.Unmarshal([]byte(s), &d)
})
Effect - Side effects without modifying data:
var LoggerName = pipz.NewIdentity("log", "")
logger := pipz.Effect(LoggerName, func(_ context.Context, d Data) error {
log.Printf("Processing: %+v", d)
return nil
})
Mutate - Conditional modifications:
var DiscountName = pipz.NewIdentity("discount", "")
discountPremium := pipz.Mutate(DiscountName,
func(_ context.Context, u User) User { u.Discount = 0.2; return u },
func(_ context.Context, u User) bool { return u.IsPremium },
)
Enrich - Optional enhancements that log failures:
var GeoEnrichName = pipz.NewIdentity("geo", "")
addLocation := pipz.Enrich(GeoEnrichName, func(ctx context.Context, u User) (User, error) {
u.Country = detectCountry(u.IP) // May fail, but won't stop pipeline
return u, nil
})
Connectors ¶
Connectors compose multiple Chainables. Choose based on your needs:
Sequential Processing:
var PipelineName = pipz.NewIdentity("pipeline", "")
pipeline := pipz.NewSequence(PipelineName, step1, step2, step3)
// Or build dynamically:
var DynamicName = pipz.NewIdentity("dynamic", "")
seq := pipz.NewSequence[T](DynamicName)
seq.Register(step1, step2)
seq.PushTail(step3) // Add at runtime
Parallel Processing (requires T implements Cloner[T]):
// Run all processors, return original data
var ParallelName = pipz.NewIdentity("parallel", "")
concurrent := pipz.NewConcurrent(ParallelName, proc1, proc2, proc3)
// Return first successful result
var FastestName = pipz.NewIdentity("fastest", "")
race := pipz.NewRace(FastestName, primary, secondary, tertiary)
// Return first result meeting a condition
var BestName = pipz.NewIdentity("best", "")
contest := pipz.NewContest(BestName, conditionFunc, option1, option2, option3)
Error Handling:
// Try fallback on error
var SafeName = pipz.NewIdentity("safe", "")
fallback := pipz.NewFallback(SafeName, primary, backup)
// Retry with attempts
var ResilientName = pipz.NewIdentity("resilient", "")
retry := pipz.NewRetry(ResilientName, processor, 3)
// Retry with exponential backoff
var ApiCallName = pipz.NewIdentity("api-call", "")
backoff := pipz.NewBackoff(ApiCallName, processor, 5, time.Second)
// Handle errors without changing data flow
var ObservedName = pipz.NewIdentity("observed", "")
handle := pipz.NewHandle(ObservedName, processor, errorPipeline)
Control Flow:
// Route based on conditions
var RouterName = pipz.NewIdentity("router", "")
router := pipz.NewSwitch(RouterName, func(ctx context.Context, d Data) string {
if d.Type == "premium" { return "premium-flow" }
return "standard-flow"
})
router.AddRoute("premium-flow", premiumProcessor)
router.AddRoute("standard-flow", standardProcessor)
// Enforce timeouts
var DeadlineName = pipz.NewIdentity("deadline", "")
timeout := pipz.NewTimeout(DeadlineName, processor, 30*time.Second)
// Conditional processing
var FeatureFlagName = pipz.NewIdentity("feature-flag", "")
filter := pipz.NewFilter(FeatureFlagName,
func(ctx context.Context, u User) bool { return u.BetaEnabled },
betaProcessor,
)
Resource Protection:
// Rate limiting
var ApiLimitName = pipz.NewIdentity("api-limit", "")
rateLimiter := pipz.NewRateLimiter(ApiLimitName, 100, 10) // 100/sec, burst 10
rateLimiter.SetMode("drop") // Or "wait" (default)
// Circuit breaker
var ServiceBreakerName = pipz.NewIdentity("service-breaker", "")
breaker := pipz.NewCircuitBreaker(ServiceBreakerName, processor, 5, 30*time.Second)
Quick Start ¶
Simple example - transform strings through a pipeline:
package main
import (
"context"
"strings"
"github.com/zoobz-io/pipz"
)
func main() {
// Define processor names as constants
const (
TrimName = pipz.NewIdentity("trim", "")
UpperName = pipz.NewIdentity("uppercase", "")
TextProcessorName = pipz.NewIdentity("text-processor", "")
)
// Create processors
trim := pipz.Transform(TrimName, func(_ context.Context, s string) string {
return strings.TrimSpace(s)
})
upper := pipz.Transform(UpperName, func(_ context.Context, s string) string {
return strings.ToUpper(s)
})
// Method 1: Direct composition
pipeline := pipz.NewSequence(TextProcessorName, trim, upper)
// Method 2: Build dynamically
sequence := pipz.NewSequence[string](TextProcessorName)
sequence.Register(trim, upper)
// Execute
result, err := pipeline.Process(context.Background(), " hello world ")
// result: "HELLO WORLD", err: nil
}
Implementing Cloner[T] ¶
For parallel processing with Concurrent or Race, types must implement Cloner[T]:
type Order struct {
ID string
Items []Item // Slice needs copying
Meta map[string]any // Map needs copying
}
func (o Order) Clone() Order {
// Deep copy slice
items := make([]Item, len(o.Items))
for i, item := range o.Items {
items[i] = item.Clone() // If Item also has references
}
// Deep copy map
meta := make(map[string]any, len(o.Meta))
for k, v := range o.Meta {
meta[k] = v // Adjust based on value types
}
return Order{ID: o.ID, Items: items, Meta: meta}
}
Choosing the Right Connector ¶
- NewSequence: Default choice for step-by-step processing
- Sequence: When you need to modify pipeline at runtime
- Switch: For conditional routing based on data
- Filter: For conditional processing (execute or skip)
- Concurrent: For parallel independent operations (requires Cloner[T])
- Race: When you need the fastest result
- Contest: When you need the fastest result that meets criteria
- Fallback: For primary/backup patterns
- Retry/Backoff: For handling transient failures
- Timeout: For operations that might hang
- Handle: For error monitoring without changing flow
- RateLimiter: For protecting rate-limited resources
- CircuitBreaker: For preventing cascade failures
Error Handling ¶
pipz provides rich error information through the Error[T] type:
type Error[T any] struct {
Path []Identity // Full path of Identity values through the pipeline
InputData T // The input that caused the failure
Err error // The underlying error
Timestamp time.Time // When the error occurred
Duration time.Duration // How long before failure
Timeout bool // Was it a timeout?
Canceled bool // Was it canceled?
}
Error handling example:
result, err := pipeline.Process(ctx, data)
if err != nil {
var pipeErr *pipz.Error[Data]
if errors.As(err, &pipeErr) {
// pipeErr.Error() formats the path automatically
log.Printf("Pipeline error: %v", pipeErr)
log.Printf("Input data: %+v", pipeErr.InputData)
log.Printf("After: %v", pipeErr.Duration)
if pipeErr.Timeout {
// Handle timeout specifically
}
}
}
Performance ¶
pipz is designed for exceptional performance:
- Transform: 2.7ns per operation with zero allocations
- Apply/Effect (success): 46ns per operation with zero allocations
- Basic pipeline overhead: ~88 bytes, 3 allocations (constant regardless of length)
- Linear scaling: 5-step pipeline ~560ns, 50-step pipeline ~2.8μs
- No reflection or runtime type assertions
- Predictable performance characteristics
See PERFORMANCE.md for detailed benchmarks.
Best Practices ¶
- Keep processors small and focused on a single responsibility
- Use descriptive names for processors to aid debugging
- Implement Cloner[T] correctly for types used with Concurrent/Race
- Use NewSequence() for both static and dynamic pipelines
- Check context.Err() in long-running processors
- Let errors bubble up - handle at pipeline level
- Use Effect for side effects to maintain purity
- Test processors in isolation before composing
- Prefer Transform over Apply when errors aren't possible
10. Use timeouts at the pipeline level, not individual processors
Common Patterns ¶
Validation Pipeline:
const (
ValidationName = pipz.NewIdentity("validation", "")
RequiredName = pipz.NewIdentity("required", "")
FormatName = pipz.NewIdentity("format", "")
SanitizeName = pipz.NewIdentity("sanitize", "")
)
validation := pipz.NewSequence(ValidationName,
pipz.Effect(RequiredName, checkRequired),
pipz.Effect(FormatName, checkFormat),
pipz.Apply(SanitizeName, sanitizeInput),
)
API with Retry and Timeout:
const (
ApiTimeoutName = pipz.NewIdentity("api-timeout", "")
ApiRetryName = pipz.NewIdentity("api-retry", "")
FetchName = pipz.NewIdentity("fetch", "")
)
apiCall := pipz.NewTimeout(ApiTimeoutName,
pipz.NewBackoff(ApiRetryName,
pipz.Apply(FetchName, fetchFromAPI),
3, time.Second,
),
30*time.Second,
)
Multi-path Processing:
var TypeRouterName = pipz.NewIdentity("type-router", "")
processor := pipz.NewSwitch(TypeRouterName, detectType)
processor.AddRoute("json", jsonProcessor)
processor.AddRoute("xml", xmlProcessor)
processor.AddRoute("csv", csvProcessor)
For more examples, see the examples directory.
Index ¶
- Variables
- func ExecutionIDFromContext(ctx context.Context) (uuid.UUID, bool)
- func PipelineIDFromContext(ctx context.Context) (uuid.UUID, bool)
- type Backoff
- func (b *Backoff[T]) Close() error
- func (b *Backoff[T]) GetBaseDelay() time.Duration
- func (b *Backoff[T]) GetMaxAttempts() int
- func (b *Backoff[T]) Identity() Identity
- func (b *Backoff[T]) Process(ctx context.Context, data T) (result T, err error)
- func (b *Backoff[T]) Schema() Node
- func (b *Backoff[T]) SetBaseDelay(d time.Duration) *Backoff[T]
- func (b *Backoff[T]) SetMaxAttempts(n int) *Backoff[T]
- func (b *Backoff[T]) WithClock(clock clockz.Clock) *Backoff[T]
- type BackoffFlow
- type Chainable
- type CircuitBreaker
- func (cb *CircuitBreaker[T]) Close() error
- func (cb *CircuitBreaker[T]) GetFailureThreshold() int
- func (cb *CircuitBreaker[T]) GetResetTimeout() time.Duration
- func (cb *CircuitBreaker[T]) GetState() string
- func (cb *CircuitBreaker[T]) GetSuccessThreshold() int
- func (cb *CircuitBreaker[T]) Identity() Identity
- func (cb *CircuitBreaker[T]) Process(ctx context.Context, data T) (result T, err error)
- func (cb *CircuitBreaker[T]) Reset() *CircuitBreaker[T]
- func (cb *CircuitBreaker[T]) Schema() Node
- func (cb *CircuitBreaker[T]) SetFailureThreshold(n int) *CircuitBreaker[T]
- func (cb *CircuitBreaker[T]) SetResetTimeout(d time.Duration) *CircuitBreaker[T]
- func (cb *CircuitBreaker[T]) SetSuccessThreshold(n int) *CircuitBreaker[T]
- func (cb *CircuitBreaker[T]) WithClock(clock clockz.Clock) *CircuitBreaker[T]
- type CircuitBreakerFlow
- type Cloner
- type Concurrent
- func (c *Concurrent[T]) Add(processor Chainable[T]) *Concurrent[T]
- func (c *Concurrent[T]) Clear() *Concurrent[T]
- func (c *Concurrent[T]) Close() error
- func (c *Concurrent[T]) Identity() Identity
- func (c *Concurrent[T]) Len() int
- func (c *Concurrent[T]) Process(ctx context.Context, input T) (result T, err error)
- func (c *Concurrent[T]) Remove(index int) error
- func (c *Concurrent[T]) Schema() Node
- func (c *Concurrent[T]) SetProcessors(processors ...Chainable[T]) *Concurrent[T]
- type ConcurrentFlow
- type Condition
- type Contest
- func (c *Contest[T]) Add(processor Chainable[T]) *Contest[T]
- func (c *Contest[T]) Clear() *Contest[T]
- func (c *Contest[T]) Close() error
- func (c *Contest[T]) Identity() Identity
- func (c *Contest[T]) Len() int
- func (c *Contest[T]) Process(ctx context.Context, input T) (result T, err error)
- func (c *Contest[T]) Remove(index int) error
- func (c *Contest[T]) Schema() Node
- func (c *Contest[T]) SetCondition(condition func(context.Context, T) bool) *Contest[T]
- func (c *Contest[T]) SetProcessors(processors ...Chainable[T]) *Contest[T]
- type ContestFlow
- type Error
- type Fallback
- func (f *Fallback[T]) AddFallback(processor Chainable[T]) *Fallback[T]
- func (f *Fallback[T]) Close() error
- func (f *Fallback[T]) GetFallback() Chainable[T]
- func (f *Fallback[T]) GetPrimary() Chainable[T]
- func (f *Fallback[T]) GetProcessors() []Chainable[T]
- func (f *Fallback[T]) Identity() Identity
- func (f *Fallback[T]) InsertAt(index int, processor Chainable[T]) error
- func (f *Fallback[T]) Len() int
- func (f *Fallback[T]) Process(ctx context.Context, data T) (result T, err error)
- func (f *Fallback[T]) RemoveAt(index int) error
- func (f *Fallback[T]) Schema() Node
- func (f *Fallback[T]) SetFallback(processor Chainable[T]) *Fallback[T]
- func (f *Fallback[T]) SetPrimary(processor Chainable[T]) *Fallback[T]
- func (f *Fallback[T]) SetProcessors(processors ...Chainable[T]) *Fallback[T]
- type FallbackFlow
- type Filter
- func (f *Filter[T]) Close() error
- func (f *Filter[T]) Condition() func(context.Context, T) bool
- func (f *Filter[T]) Identity() Identity
- func (f *Filter[T]) Process(ctx context.Context, data T) (result T, err error)
- func (f *Filter[T]) Processor() Chainable[T]
- func (f *Filter[T]) Schema() Node
- func (f *Filter[T]) SetCondition(condition func(context.Context, T) bool) *Filter[T]
- func (f *Filter[T]) SetProcessor(processor Chainable[T]) *Filter[T]
- type FilterFlow
- type Flow
- type FlowKey
- type FlowVariant
- type Handle
- func (h *Handle[T]) Close() error
- func (h *Handle[T]) GetErrorHandler() Chainable[*Error[T]]
- func (h *Handle[T]) GetProcessor() Chainable[T]
- func (h *Handle[T]) Identity() Identity
- func (h *Handle[T]) Process(ctx context.Context, input T) (result T, err error)
- func (h *Handle[T]) Schema() Node
- func (h *Handle[T]) SetErrorHandler(handler Chainable[*Error[T]]) *Handle[T]
- func (h *Handle[T]) SetProcessor(processor Chainable[T]) *Handle[T]
- type HandleFlow
- type Identity
- type Node
- type Pipeline
- type PipelineFlow
- type Processor
- func Apply[T any](identity Identity, fn func(context.Context, T) (T, error)) Processor[T]
- func Effect[T any](identity Identity, fn func(context.Context, T) error) Processor[T]
- func Enrich[T any](identity Identity, fn func(context.Context, T) (T, error)) Processor[T]
- func Mutate[T any](identity Identity, transformer func(context.Context, T) T, ...) Processor[T]
- func Transform[T any](identity Identity, fn func(context.Context, T) T) Processor[T]
- type Race
- func (r *Race[T]) Add(processor Chainable[T]) *Race[T]
- func (r *Race[T]) Clear() *Race[T]
- func (r *Race[T]) Close() error
- func (r *Race[T]) Identity() Identity
- func (r *Race[T]) Len() int
- func (r *Race[T]) Process(ctx context.Context, input T) (result T, err error)
- func (r *Race[T]) Remove(index int) error
- func (r *Race[T]) Schema() Node
- func (r *Race[T]) SetProcessors(processors ...Chainable[T]) *Race[T]
- type RaceFlow
- type RateLimiter
- func (r *RateLimiter[T]) Close() error
- func (r *RateLimiter[T]) GetAvailableTokens() float64
- func (r *RateLimiter[T]) GetBurst() int
- func (r *RateLimiter[T]) GetMode() string
- func (r *RateLimiter[T]) GetRate() float64
- func (r *RateLimiter[T]) Identity() Identity
- func (r *RateLimiter[T]) Process(ctx context.Context, data T) (result T, err error)
- func (r *RateLimiter[T]) Schema() Node
- func (r *RateLimiter[T]) SetBurst(burst int) *RateLimiter[T]
- func (r *RateLimiter[T]) SetMode(mode string) *RateLimiter[T]
- func (r *RateLimiter[T]) SetRate(ratePerSecond float64) *RateLimiter[T]
- func (r *RateLimiter[T]) WithClock(clock clockz.Clock) *RateLimiter[T]
- type RateLimiterFlow
- type Retry
- type RetryFlow
- type Scaffold
- func (s *Scaffold[T]) Add(processor Chainable[T]) *Scaffold[T]
- func (s *Scaffold[T]) Clear() *Scaffold[T]
- func (s *Scaffold[T]) Close() error
- func (s *Scaffold[T]) Identity() Identity
- func (s *Scaffold[T]) Len() int
- func (s *Scaffold[T]) Process(ctx context.Context, input T) (result T, err error)
- func (s *Scaffold[T]) Remove(index int) error
- func (s *Scaffold[T]) Schema() Node
- func (s *Scaffold[T]) SetProcessors(processors ...Chainable[T]) *Scaffold[T]
- type ScaffoldFlow
- type Schema
- type Sequence
- func (c *Sequence[T]) After(afterID Identity, processors ...Chainable[T]) error
- func (c *Sequence[T]) Before(beforeID Identity, processors ...Chainable[T]) error
- func (c *Sequence[T]) Clear()
- func (c *Sequence[T]) Close() error
- func (c *Sequence[T]) Identity() Identity
- func (c *Sequence[T]) Len() int
- func (c *Sequence[T]) Names() []string
- func (c *Sequence[T]) Pop() (Chainable[T], error)
- func (c *Sequence[T]) Process(ctx context.Context, value T) (result T, err error)
- func (c *Sequence[T]) Push(processors ...Chainable[T])
- func (c *Sequence[T]) Register(processors ...Chainable[T])
- func (c *Sequence[T]) Remove(id Identity) error
- func (c *Sequence[T]) Replace(id Identity, processor Chainable[T]) error
- func (c *Sequence[T]) Schema() Node
- func (c *Sequence[T]) Shift() (Chainable[T], error)
- func (c *Sequence[T]) Unshift(processors ...Chainable[T])
- type SequenceFlow
- type Switch
- func (s *Switch[T]) AddRoute(key string, processor Chainable[T]) *Switch[T]
- func (s *Switch[T]) ClearRoutes() *Switch[T]
- func (s *Switch[T]) Close() error
- func (s *Switch[T]) HasRoute(key string) bool
- func (s *Switch[T]) Identity() Identity
- func (s *Switch[T]) Process(ctx context.Context, data T) (result T, err error)
- func (s *Switch[T]) RemoveRoute(key string) *Switch[T]
- func (s *Switch[T]) Routes() map[string]Chainable[T]
- func (s *Switch[T]) Schema() Node
- func (s *Switch[T]) SetCondition(condition Condition[T]) *Switch[T]
- func (s *Switch[T]) SetRoutes(routes map[string]Chainable[T]) *Switch[T]
- type SwitchFlow
- type Timeout
- func (t *Timeout[T]) Close() error
- func (t *Timeout[T]) GetDuration() time.Duration
- func (t *Timeout[T]) Identity() Identity
- func (t *Timeout[T]) Process(ctx context.Context, data T) (result T, err error)
- func (t *Timeout[T]) Schema() Node
- func (t *Timeout[T]) SetDuration(d time.Duration) *Timeout[T]
- func (t *Timeout[T]) WithClock(clock clockz.Clock) *Timeout[T]
- type TimeoutFlow
- type WorkerPool
- func (w *WorkerPool[T]) Add(processor Chainable[T]) *WorkerPool[T]
- func (w *WorkerPool[T]) Clear() *WorkerPool[T]
- func (w *WorkerPool[T]) Close() error
- func (w *WorkerPool[T]) GetActiveWorkers() int
- func (w *WorkerPool[T]) GetWorkerCount() int
- func (w *WorkerPool[T]) Identity() Identity
- func (w *WorkerPool[T]) Len() int
- func (w *WorkerPool[T]) Process(ctx context.Context, input T) (result T, err error)
- func (w *WorkerPool[T]) Remove(index int) error
- func (w *WorkerPool[T]) Schema() Node
- func (w *WorkerPool[T]) SetProcessors(processors ...Chainable[T]) *WorkerPool[T]
- func (w *WorkerPool[T]) SetWorkerCount(workers int) *WorkerPool[T]
- func (w *WorkerPool[T]) WithClock(clock clockz.Clock) *WorkerPool[T]
- func (w *WorkerPool[T]) WithTimeout(timeout time.Duration) *WorkerPool[T]
- type WorkerpoolFlow
Constants ¶
This section is empty.
Variables ¶
var ( SequenceKey = FlowKey[SequenceFlow]{/* contains filtered or unexported fields */} FallbackKey = FlowKey[FallbackFlow]{/* contains filtered or unexported fields */} RaceKey = FlowKey[RaceFlow]{/* contains filtered or unexported fields */} ContestKey = FlowKey[ContestFlow]{/* contains filtered or unexported fields */} ConcurrentKey = FlowKey[ConcurrentFlow]{/* contains filtered or unexported fields */} SwitchKey = FlowKey[SwitchFlow]{/* contains filtered or unexported fields */} FilterKey = FlowKey[FilterFlow]{/* contains filtered or unexported fields */} HandleKey = FlowKey[HandleFlow]{/* contains filtered or unexported fields */} ScaffoldKey = FlowKey[ScaffoldFlow]{/* contains filtered or unexported fields */} BackoffKey = FlowKey[BackoffFlow]{/* contains filtered or unexported fields */} RetryKey = FlowKey[RetryFlow]{/* contains filtered or unexported fields */} TimeoutKey = FlowKey[TimeoutFlow]{/* contains filtered or unexported fields */} RateLimiterKey = FlowKey[RateLimiterFlow]{/* contains filtered or unexported fields */} CircuitBreakerKey = FlowKey[CircuitBreakerFlow]{/* contains filtered or unexported fields */} WorkerpoolKey = FlowKey[WorkerpoolFlow]{/* contains filtered or unexported fields */} PipelineKey = FlowKey[PipelineFlow]{/* contains filtered or unexported fields */} )
Pre-defined FlowKeys for each flow type.
var ( ErrIndexOutOfBounds = errors.New("index out of bounds") ErrEmptySequence = errors.New("sequence is empty") ErrInvalidRange = errors.New("invalid range") )
Sequence modification errors.
var ( // CircuitBreaker signals. SignalCircuitBreakerOpened = capitan.NewSignal( "circuitbreaker.opened", "Circuit breaker has transitioned to open state due to exceeding failure threshold", ) SignalCircuitBreakerClosed = capitan.NewSignal( "circuitbreaker.closed", "Circuit breaker has transitioned to closed state after successful recovery", ) SignalCircuitBreakerHalfOpen = capitan.NewSignal( "circuitbreaker.half-open", "Circuit breaker has transitioned to half-open state to test if the issue has resolved", ) SignalCircuitBreakerRejected = capitan.NewSignal( "circuitbreaker.rejected", "Circuit breaker rejected a request because it is in open state", ) // RateLimiter signals. SignalRateLimiterThrottled = capitan.NewSignal( "ratelimiter.throttled", "Rate limiter delayed a request to comply with rate limits", ) SignalRateLimiterDropped = capitan.NewSignal( "ratelimiter.dropped", "Rate limiter rejected a request because rate limit was exceeded and drop mode is enabled", ) SignalRateLimiterAllowed = capitan.NewSignal( "ratelimiter.allowed", "Rate limiter allowed a request to proceed", ) // WorkerPool signals. SignalWorkerPoolSaturated = capitan.NewSignal( "workerpool.saturated", "Worker pool has reached maximum capacity and is waiting for available workers", ) SignalWorkerPoolAcquired = capitan.NewSignal( "workerpool.acquired", "Worker pool acquired a worker slot for processing", ) SignalWorkerPoolReleased = capitan.NewSignal( "workerpool.released", "Worker pool released a worker slot after processing completed", ) // Retry signals. SignalRetryAttemptStart = capitan.NewSignal( "retry.attempt-start", "Retry connector is starting an execution attempt", ) SignalRetryAttemptFail = capitan.NewSignal( "retry.attempt-fail", "Retry connector attempt failed and will be retried if attempts remain", ) SignalRetryExhausted = capitan.NewSignal( "retry.exhausted", "Retry connector has exhausted all retry attempts and is failing", ) // Fallback signals. SignalFallbackAttempt = capitan.NewSignal( "fallback.attempt", "Fallback connector is attempting to execute a processor in the fallback chain", ) SignalFallbackFailed = capitan.NewSignal( "fallback.failed", "Fallback connector exhausted all processors without success", ) // Timeout signals. SignalTimeoutTriggered = capitan.NewSignal( "timeout.triggered", "Timeout connector canceled execution because the deadline was exceeded", ) // Backoff signals. SignalBackoffWaiting = capitan.NewSignal( "backoff.waiting", "Backoff connector is delaying before the next execution attempt", ) // Sequence signals. SignalSequenceCompleted = capitan.NewSignal( "sequence.completed", "Sequence connector completed processing all processors successfully", ) // Concurrent signals. SignalConcurrentCompleted = capitan.NewSignal( "concurrent.completed", "Concurrent connector completed all parallel processors", ) // Race signals. SignalRaceWinner = capitan.NewSignal( "race.winner", "Race connector determined a winner from parallel processors", ) // Contest signals. SignalContestWinner = capitan.NewSignal( "contest.winner", "Contest connector found a result meeting the condition", ) // Scaffold signals. SignalScaffoldDispatched = capitan.NewSignal( "scaffold.dispatched", "Scaffold connector dispatched processors for background execution", ) // Switch signals. SignalSwitchRouted = capitan.NewSignal( "switch.routed", "Switch connector routed data to a processor based on condition", ) // Filter signals. SignalFilterEvaluated = capitan.NewSignal( "filter.evaluated", "Filter connector evaluated condition and determined whether to process", ) // Handle signals. SignalHandleErrorHandled = capitan.NewSignal( "handle.error-handled", "Handle connector processed an error through the error handler", ) )
Signal definitions for pipz connector events. Signals follow the pattern: <connector-type>.<event>.
var ( // Identity tracking field. FieldIdentityID = capitan.NewStringKey("identity_id") // UUID of the component // Common fields. FieldName = capitan.NewStringKey("name") // Connector instance name FieldError = capitan.NewStringKey("error") // Error message FieldTimestamp = capitan.NewFloat64Key("timestamp") // Unix timestamp // CircuitBreaker fields. FieldState = capitan.NewStringKey("state") // Circuit state: closed/open/half-open FieldFailures = capitan.NewIntKey("failures") // Current failure count FieldSuccesses = capitan.NewIntKey("successes") // Current success count FieldFailureThreshold = capitan.NewIntKey("failure_threshold") // Threshold to open FieldSuccessThreshold = capitan.NewIntKey("success_threshold") // Threshold to close from half-open FieldResetTimeout = capitan.NewFloat64Key("reset_timeout") // Reset timeout in seconds FieldGeneration = capitan.NewIntKey("generation") // Circuit generation number FieldLastFailTime = capitan.NewFloat64Key("last_fail_time") // Last failure timestamp // RateLimiter fields. FieldRate = capitan.NewFloat64Key("rate") // Requests per second FieldBurst = capitan.NewIntKey("burst") // Burst capacity FieldTokens = capitan.NewFloat64Key("tokens") // Current tokens FieldMode = capitan.NewStringKey("mode") // Mode: wait/drop FieldWaitTime = capitan.NewFloat64Key("wait_time") // Wait time in seconds // WorkerPool fields. FieldWorkerCount = capitan.NewIntKey("worker_count") // Total worker slots FieldActiveWorkers = capitan.NewIntKey("active_workers") // Currently active workers // Retry fields. FieldAttempt = capitan.NewIntKey("attempt") // Current attempt number FieldMaxAttempts = capitan.NewIntKey("max_attempts") // Maximum attempts // Fallback fields. FieldProcessorIndex = capitan.NewIntKey("processor_index") // Index of processor being tried FieldProcessorName = capitan.NewStringKey("processor_name") // Name of processor being tried // Timeout fields. FieldDuration = capitan.NewFloat64Key("duration") // Timeout duration in seconds // Backoff fields. FieldDelay = capitan.NewFloat64Key("delay") // Current backoff delay in seconds FieldNextDelay = capitan.NewFloat64Key("next_delay") // Next delay if this attempt fails in seconds // Sequence fields. FieldProcessorCount = capitan.NewIntKey("processor_count") // Number of processors in chain // Concurrent fields. FieldErrorCount = capitan.NewIntKey("error_count") // Number of errors from parallel execution // Race/Contest fields. FieldWinnerName = capitan.NewStringKey("winner_name") // Name of winning processor // Switch fields. FieldRouteKey = capitan.NewStringKey("route_key") // Key used for routing FieldMatched = capitan.NewBoolKey("matched") // Whether a route was matched // Filter fields. FieldPassed = capitan.NewBoolKey("passed") // Whether filter condition passed )
Common field keys using capitan primitive types. All keys use primitive types to avoid custom struct serialization.
Functions ¶
func ExecutionIDFromContext ¶
ExecutionIDFromContext extracts the execution ID from context. Returns the execution UUID and true if present, or uuid.Nil and false otherwise.
Types ¶
type Backoff ¶
type Backoff[T any] struct { // contains filtered or unexported fields }
Backoff attempts the processor with exponential backoff between attempts. Backoff adds intelligent spacing between retry attempts, starting with baseDelay and doubling after each failure. This prevents overwhelming failed services and allows time for transient issues to resolve.
The exponential backoff pattern (delay, 2*delay, 4*delay, ...) is widely used for its effectiveness in handling various failure scenarios without overwhelming systems. The operation can be canceled via context during waits.
Ideal for:
- API calls to rate-limited services
- Database operations during high load
- Distributed system interactions
- Any operation where immediate retry is counterproductive
The total time spent can be significant with multiple retries. For example, with baseDelay=1s and maxAttempts=5:
Delays: 1s, 2s, 4s, 8s (total wait: 15s plus processing time)
func NewBackoff ¶
func NewBackoff[T any](identity Identity, processor Chainable[T], maxAttempts int, baseDelay time.Duration) *Backoff[T]
NewBackoff creates a new Backoff connector.
func (*Backoff[T]) Close ¶
Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.
func (*Backoff[T]) GetBaseDelay ¶
GetBaseDelay returns the current base delay setting.
func (*Backoff[T]) GetMaxAttempts ¶
GetMaxAttempts returns the current maximum attempts setting.
func (*Backoff[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Backoff[T]) SetBaseDelay ¶
SetBaseDelay updates the base delay duration.
func (*Backoff[T]) SetMaxAttempts ¶
SetMaxAttempts updates the maximum number of retry attempts.
type BackoffFlow ¶
type BackoffFlow struct {
Processor Node `json:"processor"`
}
BackoffFlow represents processing with exponential backoff retry.
type Chainable ¶
type Chainable[T any] interface { Process(context.Context, T) (T, error) Identity() Identity Schema() Node Close() error }
Chainable defines the interface for any component that can process values of type T. This interface enables composition of different processing components that operate on the same type.
Chainable is the foundation of pipz - every processor, pipeline, and connector implements this interface. The uniform interface enables seamless composition while maintaining type safety through Go generics.
Key design principles:
- Context support for timeout and cancellation
- Type safety through generics (no interface{})
- Error propagation for fail-fast behavior
- Immutable by convention (return modified copies)
- Identity-based components for debugging, monitoring, and visualization
type CircuitBreaker ¶
type CircuitBreaker[T any] struct { // contains filtered or unexported fields }
CircuitBreaker prevents cascading failures by stopping requests to failing services. CircuitBreaker implements the circuit breaker pattern with three states:
- Closed: Normal operation, requests pass through
- Open: Requests fail immediately without calling the wrapped processor
- Half-Open: Testing state, limited requests to check if service recovered
CRITICAL: CircuitBreaker is a STATEFUL connector that tracks failure counts across requests. Create it once and reuse it - do NOT create a new CircuitBreaker for each request, as that would reset the failure count and the circuit would never open.
❌ WRONG - Creating per request (never opens):
func handleRequest(req Request) Response {
breaker := pipz.NewCircuitBreaker("api", proc, 5, 30*time.Second) // NEW breaker!
return breaker.Process(ctx, req) // Always closed, failure count always 0
}
✅ RIGHT - Create once, reuse:
var apiBreaker = pipz.NewCircuitBreaker("api", apiProcessor, 5, 30*time.Second)
func handleRequest(req Request) Response {
return apiBreaker.Process(ctx, req) // Tracks failures across requests
}
The circuit opens after consecutive failures reach the threshold. After a timeout period, it transitions to half-open to test recovery. Successful requests in half-open state close the circuit, while failures reopen it.
CircuitBreaker is essential for:
- Preventing cascade failures in distributed systems
- Giving failing services time to recover
- Failing fast when services are down
- Reducing unnecessary load on struggling services
- Improving overall system resilience
Best Practices:
- Use const names for all processors/connectors (see best-practices.md)
- Create CircuitBreakers once and reuse them (e.g., as struct fields or package variables)
- Set thresholds based on service characteristics
- Combine with RateLimiter for comprehensive protection
- Monitor circuit state for operational awareness
Example:
// Define names as constants
const (
ConnectorAPIBreaker = "api-breaker"
ConnectorDatabaseBreaker = "db-breaker"
ProcessorAPICall = "api-call"
)
// Create breakers once and reuse
var (
// External API - fail fast, longer recovery
apiBreaker = pipz.NewCircuitBreaker(
ConnectorAPIBreaker,
pipz.Apply(ProcessorAPICall, callExternalAPI),
5, // Open after 5 failures
30 * time.Second, // Try recovery after 30s
)
// Internal database - more tolerant
dbBreaker = pipz.NewCircuitBreaker(
ConnectorDatabaseBreaker,
pipz.Apply("db-query", queryDatabase),
10, // Open after 10 failures
10 * time.Second, // Try recovery after 10s
)
)
// Combine with rate limiting for full protection
func createResilientPipeline() pipz.Chainable[Request] {
return pipz.NewSequence("resilient-pipeline",
rateLimiter, // Protect downstream from overload
apiBreaker, // Fail fast if service is down
pipz.NewRetry("retry", processor, 3), // Retry transient failures
)
}
func NewCircuitBreaker ¶
func NewCircuitBreaker[T any](identity Identity, processor Chainable[T], failureThreshold int, resetTimeout time.Duration) *CircuitBreaker[T]
NewCircuitBreaker creates a new CircuitBreaker connector. The failureThreshold sets how many consecutive failures trigger opening. The resetTimeout sets how long to wait before attempting recovery.
func (*CircuitBreaker[T]) Close ¶
func (cb *CircuitBreaker[T]) Close() error
Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.
func (*CircuitBreaker[T]) GetFailureThreshold ¶
func (cb *CircuitBreaker[T]) GetFailureThreshold() int
GetFailureThreshold returns the current failure threshold.
func (*CircuitBreaker[T]) GetResetTimeout ¶
func (cb *CircuitBreaker[T]) GetResetTimeout() time.Duration
GetResetTimeout returns the current reset timeout.
func (*CircuitBreaker[T]) GetState ¶
func (cb *CircuitBreaker[T]) GetState() string
GetState returns the current circuit state.
func (*CircuitBreaker[T]) GetSuccessThreshold ¶
func (cb *CircuitBreaker[T]) GetSuccessThreshold() int
GetSuccessThreshold returns the current success threshold.
func (*CircuitBreaker[T]) Identity ¶
func (cb *CircuitBreaker[T]) Identity() Identity
Identity returns the identity of this connector.
func (*CircuitBreaker[T]) Process ¶
func (cb *CircuitBreaker[T]) Process(ctx context.Context, data T) (result T, err error)
Process implements the Chainable interface.
func (*CircuitBreaker[T]) Reset ¶
func (cb *CircuitBreaker[T]) Reset() *CircuitBreaker[T]
Reset manually resets the circuit to closed state.
func (*CircuitBreaker[T]) Schema ¶
func (cb *CircuitBreaker[T]) Schema() Node
Schema returns a Node representing this connector in the pipeline schema.
func (*CircuitBreaker[T]) SetFailureThreshold ¶
func (cb *CircuitBreaker[T]) SetFailureThreshold(n int) *CircuitBreaker[T]
SetFailureThreshold updates the consecutive failures needed to open the circuit.
func (*CircuitBreaker[T]) SetResetTimeout ¶
func (cb *CircuitBreaker[T]) SetResetTimeout(d time.Duration) *CircuitBreaker[T]
SetResetTimeout updates the time to wait before attempting recovery.
func (*CircuitBreaker[T]) SetSuccessThreshold ¶
func (cb *CircuitBreaker[T]) SetSuccessThreshold(n int) *CircuitBreaker[T]
SetSuccessThreshold updates the successes needed to close from half-open state.
func (*CircuitBreaker[T]) WithClock ¶
func (cb *CircuitBreaker[T]) WithClock(clock clockz.Clock) *CircuitBreaker[T]
WithClock sets a custom clock for testing.
type CircuitBreakerFlow ¶
type CircuitBreakerFlow struct {
Processor Node `json:"processor"`
}
CircuitBreakerFlow represents processing with circuit breaker protection.
func (CircuitBreakerFlow) Variant ¶
func (CircuitBreakerFlow) Variant() FlowVariant
Variant implements Flow.
type Cloner ¶
type Cloner[T any] interface { Clone() T }
Cloner is an interface for types that can create deep copies of themselves. Implementing this interface is required to use types with Concurrent and Race connectors, providing a type-safe and performant alternative to reflection-based copying.
The Clone method must return a deep copy where modifications to the clone do not affect the original value. For types containing pointers, slices, or maps, ensure these are also copied to achieve true isolation between concurrent processors.
Example implementation:
type Order struct {
ID string
Items []Item
Status string
Metadata map[string]string
}
func (o Order) Clone() Order {
// Deep copy slice
items := make([]Item, len(o.Items))
copy(items, o.Items)
// Deep copy map
metadata := make(map[string]string, len(o.Metadata))
for k, v := range o.Metadata {
metadata[k] = v
}
return Order{
ID: o.ID,
Items: items,
Status: o.Status,
Metadata: metadata,
}
}
type Concurrent ¶
type Concurrent[T Cloner[T]] struct { // contains filtered or unexported fields }
Concurrent runs all processors in parallel with the original context preserved. This connector passes the original context directly to each processor, preserving distributed tracing information, spans, and other context values. Each processor receives a deep copy of the input, ensuring complete isolation.
Concurrent supports two modes:
- Without reducer (nil): Returns the original input unchanged after all processors complete
- With reducer: Collects all results and errors, then calls the reducer function to produce the final output
The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.
Use Concurrent when you need:
- Distributed tracing to work across concurrent operations
- All processors to respect the original context's cancellation
- To wait for all processors to complete before continuing
- Multiple side effects to happen simultaneously
- To aggregate results from parallel operations (with reducer)
Common use cases:
- Sending traced notifications to multiple channels
- Updating multiple external systems with trace context
- Parallel logging with trace IDs preserved
- Fetching data from multiple sources and merging results
- Operations that must all complete or be canceled together
Important characteristics:
- Input type must implement Cloner[T] interface
- All processors run regardless of individual failures
- Context cancellation immediately affects all processors
- Preserves trace context and spans for distributed tracing
- Waits for all processors to complete
- Reducer receives map[string]T for results and map[string]error for errors (keyed by processor name)
Example without reducer (side effects):
type Order struct {
ID string
Items []Item
Status string
}
func (o Order) Clone() Order {
items := make([]Item, len(o.Items))
copy(items, o.Items)
return Order{
ID: o.ID,
Items: items,
Status: o.Status,
}
}
var NotifyOrderID = pipz.NewIdentity("notify-order", "Sends notifications for order")
concurrent := pipz.NewConcurrent(
NotifyOrderID,
nil, // no reducer, just run side effects
sendEmailNotification,
sendSMSNotification,
updateInventorySystem,
logToAnalytics,
)
Example with reducer (aggregate results):
type PriceCheck struct {
ProductID string
BestPrice float64
}
func (p PriceCheck) Clone() PriceCheck {
return p
}
reducer := func(original PriceCheck, results map[string]PriceCheck, errors map[string]error) PriceCheck {
bestPrice := original.BestPrice
for _, result := range results {
if result.BestPrice < bestPrice {
bestPrice = result.BestPrice
}
}
return PriceCheck{ProductID: original.ProductID, BestPrice: bestPrice}
}
var CheckPricesID = pipz.NewIdentity("check-prices", "Checks prices across vendors")
concurrent := pipz.NewConcurrent(
CheckPricesID,
reducer,
checkAmazon,
checkWalmart,
checkTarget,
)
func NewConcurrent ¶
func NewConcurrent[T Cloner[T]](identity Identity, reducer func(original T, results map[Identity]T, errors map[Identity]error) T, processors ...Chainable[T]) *Concurrent[T]
NewConcurrent creates a new Concurrent connector. If reducer is nil, the original input is returned unchanged. If reducer is provided, it receives the original input, all processor results, and any errors (keyed by processor Identity), allowing you to aggregate or merge results into a new T.
func (*Concurrent[T]) Add ¶
func (c *Concurrent[T]) Add(processor Chainable[T]) *Concurrent[T]
Add appends a processor to the concurrent execution list.
func (*Concurrent[T]) Clear ¶
func (c *Concurrent[T]) Clear() *Concurrent[T]
Clear removes all processors from the concurrent execution list.
func (*Concurrent[T]) Close ¶
func (c *Concurrent[T]) Close() error
Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.
func (*Concurrent[T]) Identity ¶
func (c *Concurrent[T]) Identity() Identity
Identity returns the identity of this connector.
func (*Concurrent[T]) Process ¶
func (c *Concurrent[T]) Process(ctx context.Context, input T) (result T, err error)
Process implements the Chainable interface.
func (*Concurrent[T]) Remove ¶
func (c *Concurrent[T]) Remove(index int) error
Remove removes the processor at the specified index.
func (*Concurrent[T]) Schema ¶
func (c *Concurrent[T]) Schema() Node
Schema returns a Node representing this connector in the pipeline schema.
func (*Concurrent[T]) SetProcessors ¶
func (c *Concurrent[T]) SetProcessors(processors ...Chainable[T]) *Concurrent[T]
SetProcessors replaces all processors atomically.
type ConcurrentFlow ¶
type ConcurrentFlow struct {
Tasks []Node `json:"tasks"`
}
ConcurrentFlow represents parallel execution of independent tasks. All tasks run simultaneously; combined into single output.
func (ConcurrentFlow) Variant ¶
func (ConcurrentFlow) Variant() FlowVariant
Variant implements Flow.
type Condition ¶
Condition determines routing based on input data. Returns a route key string for multi-way branching.
Define string constants for type-safe routing:
const (
RouteStandard = "standard"
RouteHighValue = "high_value"
RouteCrypto = "crypto"
)
Common patterns include routing by:
- Status strings for workflow states
- Region identifiers
- Priority levels as strings
- Feature flag names
type Contest ¶
type Contest[T Cloner[T]] struct { // contains filtered or unexported fields }
Contest runs all processors in parallel and returns the first result that meets a specified condition. Contest combines competitive processing (like Race) with conditional selection, allowing you to define what makes a "winner" beyond just being first to complete.
Context handling: Contest uses context.WithCancel(ctx) to create a derived context that preserves all parent context values (including trace IDs) while allowing cancellation of other processors when a winner meeting the condition is found.
The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.
This pattern excels when you have multiple ways to get a result and want the fastest one that meets specific criteria:
- Finding the cheapest shipping rate under a time constraint
- Getting the first API response with required data completeness
- Querying multiple sources for the best quality result quickly
- Racing services where the "best" result matters more than just "first"
- Any scenario where you need speed AND quality criteria
Key behaviors:
- First result meeting the condition wins and cancels others
- If no results meet the condition, returns the original input with an error
- Each processor gets an isolated copy via Clone()
- Condition is evaluated as results arrive (no waiting for all)
- Can reduce latency while ensuring quality constraints
Example:
// Find the first shipping rate under $50
contest := pipz.NewContest("cheapest-rate",
func(_ context.Context, rate Rate) bool {
return rate.Cost < 50.00
},
fedexRates,
upsRates,
uspsRates,
)
func NewContest ¶
func NewContest[T Cloner[T]](identity Identity, condition func(context.Context, T) bool, processors ...Chainable[T]) *Contest[T]
NewContest creates a new Contest connector with the specified winning condition. The condition function determines which results are acceptable winners. A result must both complete successfully AND meet the condition to win.
func (*Contest[T]) Close ¶
Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.
func (*Contest[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Contest[T]) SetCondition ¶
SetCondition updates the winning condition. This allows changing the criteria at runtime.
func (*Contest[T]) SetProcessors ¶
SetProcessors replaces all processors atomically.
type ContestFlow ¶
type ContestFlow struct {
Competitors []Node `json:"competitors"`
}
ContestFlow represents parallel execution with result selection. All competitors complete; a selector chooses the best result.
type Error ¶
type Error[T any] struct { Timestamp time.Time InputData T Err error Path []Identity Duration time.Duration Timeout bool Canceled bool }
Error provides rich context about pipeline execution failures. It wraps the underlying error with information about where and when the failure occurred, what data was being processed, and the complete path through the processing chain.
The Path field contains Identity values, enabling correlation between error paths and schema definitions via the Identity.ID() UUIDs.
func (*Error[T]) IsCanceled ¶
IsCanceled returns true if the error was caused by cancellation. This typically indicates intentional termination rather than failure, useful for distinguishing between errors that should trigger alerts versus expected shutdowns.
type Fallback ¶
type Fallback[T any] struct { // contains filtered or unexported fields }
Fallback attempts processors in order, falling back to the next on error. Fallback provides automatic failover through a chain of alternative processors when earlier ones fail. This creates resilient processing chains that can recover from failures gracefully.
Unlike Retry which attempts the same operation multiple times, Fallback switches to completely different implementations. Each processor is tried in order until one succeeds or all fail.
Common use cases:
- Primary/backup/tertiary service failover
- Graceful degradation strategies
- Multiple payment provider support
- Cache miss handling (try local cache, then redis, then database)
- API version compatibility chains
Example:
fallback := pipz.NewFallback("payment-providers",
stripeProcessor, // Try Stripe first
paypalProcessor, // Fall back to PayPal on error
squareProcessor, // Finally try Square
)
IMPORTANT: Avoid circular references between Fallback instances when all processors fail. Example of DANGEROUS pattern:
fallback1 → fallback2 → fallback3 → fallback1
This creates infinite recursion risk if all processors fail, leading to stack overflow.
func NewFallback ¶
NewFallback creates a new Fallback connector that tries processors in order. Each processor is tried in order until one succeeds or all fail. If no processors are provided, Process() will return an error.
Examples:
fallback := pipz.NewFallback(PaymentID, stripe, paypal, square) fallback := pipz.NewFallback(CacheID, redis, database)
func (*Fallback[T]) AddFallback ¶
AddFallback appends a processor to the end of the fallback chain.
func (*Fallback[T]) Close ¶
Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.
func (*Fallback[T]) GetFallback ¶
GetFallback returns the second processor (for backward compatibility). Returns nil if there's no second processor.
func (*Fallback[T]) GetPrimary ¶
GetPrimary returns the first processor (for backward compatibility).
func (*Fallback[T]) GetProcessors ¶
GetProcessors returns a copy of all processors in order.
func (*Fallback[T]) Process ¶
Process implements the Chainable interface. Tries each processor in order until one succeeds or all fail.
func (*Fallback[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Fallback[T]) SetFallback ¶
SetFallback updates the second processor (for backward compatibility). If there's no second processor, adds one.
func (*Fallback[T]) SetPrimary ¶
SetPrimary updates the first processor (for backward compatibility).
func (*Fallback[T]) SetProcessors ¶
SetProcessors replaces all processors with the provided ones. If no processors are provided, Process() will return an error.
type FallbackFlow ¶
FallbackFlow represents primary/backup processing. The primary is tried first; if it fails, backups are tried in order.
type Filter ¶
type Filter[T any] struct { // contains filtered or unexported fields }
Filter creates a conditional processor that either continues the pipeline unchanged or executes a processor based on a predicate function.
Filter provides a clean way to implement conditional processing without complex if-else logic scattered throughout your code. When the condition returns true, the processor is executed. When false, data passes through unchanged with no errors.
This is ideal for:
- Feature flags (process only for enabled users)
- A/B testing (apply changes to test group)
- Optional processing steps based on data state
- Business rules that apply to subset of data
- Conditional enrichment or validation
- Performance optimizations (skip expensive operations)
Unlike Switch which routes to different processors, Filter either processes or passes through. Unlike Mutate which only supports transformations that cannot fail, Filter can execute any Chainable including ones that may error.
Example - Feature flag processing:
enableNewFeature := pipz.NewFilter("feature-flag",
func(ctx context.Context, user User) bool {
return user.BetaEnabled && isFeatureEnabled(ctx, "new-algorithm")
},
newAlgorithmProcessor,
)
Example - Conditional validation:
validatePremium := pipz.NewFilter("premium-validation",
func(ctx context.Context, order Order) bool {
return order.CustomerTier == "premium"
},
pipz.NewSequence("premium-checks",
validateCreditLimit,
checkFraudScore,
verifyIdentity,
),
)
The Filter connector is thread-safe and can be safely used in concurrent scenarios. The condition function and processor can be updated at runtime for dynamic behavior.
func NewFilter ¶
func NewFilter[T any](identity Identity, condition func(context.Context, T) bool, processor Chainable[T]) *Filter[T]
NewFilter creates a new Filter connector with the given condition and processor. When condition returns true, processor is executed. When false, data passes through unchanged.
func (*Filter[T]) Close ¶
Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.
func (*Filter[T]) Condition ¶
Condition returns a copy of the current condition function. Note: This returns the function reference, not a deep copy.
func (*Filter[T]) Process ¶
Process implements the Chainable interface. Evaluates the condition and either executes the processor or passes data through unchanged.
func (*Filter[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Filter[T]) SetCondition ¶
SetCondition updates the condition function. This allows for dynamic behavior changes at runtime.
func (*Filter[T]) SetProcessor ¶
SetProcessor updates the processor to execute when condition is true. This allows for dynamic processor changes at runtime.
type FilterFlow ¶
type FilterFlow struct {
Processor Node `json:"processor"`
}
FilterFlow represents conditional processing. Items matching the predicate are processed; others pass through.
type Flow ¶
type Flow interface {
// Variant returns the type discriminator for this flow.
Variant() FlowVariant
}
Flow represents how children are organized within a connector node. Each connector type has its own Flow implementation that describes the semantic relationship between the connector and its children.
Leaf nodes (processors) have nil Flow since they have no children.
type FlowKey ¶
type FlowKey[T Flow] struct { // contains filtered or unexported fields }
FlowKey provides bidirectional type-safe extraction for Flow types. This follows the pattern established in capitan for type-safe field extraction.
Example:
if seq, ok := SequenceKey.From(node); ok {
for _, step := range seq.Steps {
// process each step
}
}
func (FlowKey[T]) From ¶
From extracts the typed Flow from a Node. Returns the flow and true if the node's flow matches this key's type, or zero value and false otherwise.
func (FlowKey[T]) Variant ¶
func (k FlowKey[T]) Variant() FlowVariant
Variant returns the flow type this key extracts.
type FlowVariant ¶
type FlowVariant string
FlowVariant is a discriminator for the Flow interface implementation type. Used for runtime type identification when type assertions are needed.
const ( // Connectors (have children). FlowVariantSequence FlowVariant = "sequence" FlowVariantFallback FlowVariant = "fallback" FlowVariantRace FlowVariant = "race" FlowVariantContest FlowVariant = "contest" FlowVariantConcurrent FlowVariant = "concurrent" FlowVariantSwitch FlowVariant = "switch" FlowVariantFilter FlowVariant = "filter" FlowVariantHandle FlowVariant = "handle" FlowVariantScaffold FlowVariant = "scaffold" FlowVariantBackoff FlowVariant = "backoff" FlowVariantRetry FlowVariant = "retry" FlowVariantTimeout FlowVariant = "timeout" FlowVariantRateLimiter FlowVariant = "ratelimiter" FlowVariantCircuitBreaker FlowVariant = "circuitbreaker" FlowVariantWorkerpool FlowVariant = "workerpool" FlowVariantPipeline FlowVariant = "pipeline" // Processors (leaf nodes). FlowVariantApply FlowVariant = "apply" FlowVariantTransform FlowVariant = "transform" FlowVariantEffect FlowVariant = "effect" FlowVariantEnrich FlowVariant = "enrich" FlowVariantMutate FlowVariant = "mutate" )
Flow variants for all pipeline node types.
type Handle ¶
type Handle[T any] struct { // contains filtered or unexported fields }
Handle provides error observation and handling for processors. When the wrapped processor fails, Handle passes the error to an error handler for processing (e.g., logging, cleanup, notifications), then passes the original error through.
Common patterns:
- Log errors with additional context
- Clean up resources on failure (e.g., release inventory)
- Send notifications or alerts
- Collect metrics about failures
- Implement compensation logic
The error handler receives a Chainable[*Error[T]] with full error context, including the input data, error details, and processing path.
Example:
// Log errors with context
logged := pipz.NewHandle(
"with-logging",
processOrder,
pipz.Effect("log", func(ctx context.Context, err *Error[Order]) error {
log.Printf("order %s failed: %v", err.InputData.ID, err.Err)
return nil
}),
)
// Clean up resources on failure
withCleanup := pipz.NewHandle(
"inventory-cleanup",
reserveAndCharge,
pipz.Effect("release", func(ctx context.Context, err *Error[Order]) error {
if err.InputData.ReservationID != "" {
inventory.Release(err.InputData.ReservationID)
}
return nil
}),
)
func NewHandle ¶
func NewHandle[T any](identity Identity, processor Chainable[T], errorHandler Chainable[*Error[T]]) *Handle[T]
NewHandle creates a new Handle connector.
func (*Handle[T]) Close ¶
Close gracefully shuts down the connector and its child processors. Close is idempotent - multiple calls return the same result.
func (*Handle[T]) GetErrorHandler ¶
GetErrorHandler returns the current error handler.
func (*Handle[T]) GetProcessor ¶
GetProcessor returns the current main processor.
func (*Handle[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Handle[T]) SetErrorHandler ¶
SetErrorHandler updates the error handler.
func (*Handle[T]) SetProcessor ¶
SetProcessor updates the main processor.
type HandleFlow ¶
type HandleFlow struct {
Processor Node `json:"processor"`
ErrorHandler Node `json:"error_handler"`
}
HandleFlow represents error observation and handling. The processor is wrapped; errors flow to the error handler.
type Identity ¶
type Identity struct {
// contains filtered or unexported fields
}
Identity provides rich metadata for processors and connectors. It replaces the simple Name type with structured identity information that supports debugging, visualization, and profiling.
Each Identity has an auto-generated UUID that uniquely identifies the processor or connector instance, enabling correlation between schema definitions and runtime signal events.
Example:
var (
ValidateOrderID = pipz.NewIdentity("validate-order", "Validates order structure")
EnrichCustomerID = pipz.NewIdentity("enrich-customer", "Adds customer details from CRM")
)
pipeline := pipz.NewSequence(PipelineID,
pipz.Apply(ValidateOrderID, validateOrder),
pipz.Enrich(EnrichCustomerID, enrichCustomer),
)
func NewIdentity ¶
NewIdentity creates a new Identity with an auto-generated UUID. The name should be a short, descriptive identifier (e.g., "validate-order"). The description provides additional context for debugging and documentation.
func (Identity) Description ¶
Description returns the optional description.
type Node ¶
type Node struct {
Identity Identity `json:"-"`
Type string `json:"type"`
Flow Flow `json:"flow,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
Node represents a node in the pipeline schema tree. It provides a serializable representation of the pipeline structure for visualization, debugging, and tooling.
The schema can be generated at build time via the Schema() method on any Chainable, providing a complete picture of the pipeline structure without executing it.
For connector nodes (Sequence, Fallback, etc.), the Flow field contains semantic child relationships. For processor nodes (Apply, Transform, etc.), Flow is nil since they are leaf nodes.
Example:
pipeline := pipz.NewSequence(PipelineID,
pipz.Apply(ValidateID, validate),
pipz.NewFallback(FallbackID,
pipz.Apply(PrimaryID, primary),
pipz.Apply(BackupID, backup),
),
)
schema := pipeline.Schema()
jsonBytes, _ := json.MarshalIndent(schema, "", " ")
fmt.Println(string(jsonBytes))
func (Node) MarshalJSON ¶
MarshalJSON implements json.Marshaler. It flattens the Identity into separate id, name, and description fields.
func (*Node) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler. Note: The Identity UUID will be regenerated on unmarshal since the original UUID cannot be reconstructed from the JSON. Note: Flow unmarshaling is not supported - schemas are built from pipelines, not JSON.
type Pipeline ¶
type Pipeline[T any] struct { // contains filtered or unexported fields }
Pipeline wraps a Chainable to define a semantic execution context. It provides correlation between runtime signals and pipeline identity by injecting execution and pipeline IDs into the context.
Each Process() call generates a unique execution ID, while the pipeline ID remains stable (derived from the Pipeline's identity). Nested connectors can extract these IDs to include in signals, enabling distributed tracing and observability.
Example:
// Define a semantic pipeline
orderPipeline := pipz.NewPipeline(
pipz.NewIdentity("order-processing", "Main order flow"),
pipz.NewSequence(internalID, validate, enrich, save),
)
// All signals from nested connectors get correlation IDs
result, err := orderPipeline.Process(ctx, order)
// In signal handlers, extract IDs for correlation
if execID, ok := pipz.ExecutionIDFromContext(ctx); ok {
log.Printf("Execution %s completed", execID)
}
func NewPipeline ¶
NewPipeline creates a Pipeline that wraps a Chainable with execution context. The identity defines the semantic pipeline name for correlation purposes, separate from the identities of the components within.
type PipelineFlow ¶
type PipelineFlow struct {
Root Node `json:"root"`
}
PipelineFlow represents a semantic execution context wrapper. It wraps a root chainable with execution tracking metadata.
type Processor ¶
type Processor[T any] struct { // contains filtered or unexported fields }
Processor defines an identified processing stage that transforms a value of type T. It contains an Identity for debugging and visualization, and a private function that processes the value. The function receives a context for cancellation and timeout control.
Processor is the basic building block created by adapter functions like Apply, Transform, Effect, Mutate, and Enrich. The identity field is crucial for debugging, appearing in error messages and the Error[T].Path to identify exactly where failures occur. The identity's UUID enables correlation between schema definitions and runtime signal events.
The fn field is intentionally private to ensure processors are only created through the provided adapter functions, maintaining consistent error handling and path tracking.
Best practices for processor identities:
- Use descriptive, action-oriented names ("validate-email", not "email")
- Include the operation type ("parse-json", "fetch-user", "log-event")
- Keep names concise but meaningful
- Use descriptions to document the processor's purpose
- Identities appear in Error[T].Path for debugging
func Apply ¶
Apply creates a Processor from a function that transforms data and may return an error. Apply is the workhorse processor - use it when your transformation might fail due to validation, parsing, external API calls, or business rule violations.
The function receives a context for timeout/cancellation support. Long-running operations should check ctx.Err() periodically. On error, the pipeline stops immediately and returns the error wrapped with debugging context.
Apply is ideal for:
- Data validation with transformation
- API calls that return modified data
- Database lookups that enhance data
- Parsing operations that might fail
- Business rule enforcement
For pure transformations that can't fail, use Transform for better performance. For operations that should continue on failure, use Enrich.
Example:
var ParseJSONID = pipz.NewIdentity("parse-json", "Parses JSON input into Data struct")
parseJSON := pipz.Apply(ParseJSONID, func(ctx context.Context, raw string) (Data, error) {
var data Data
if err := json.Unmarshal([]byte(raw), &data); err != nil {
return Data{}, fmt.Errorf("invalid JSON: %w", err)
}
return data, nil
})
func Effect ¶
Effect creates a Processor that performs side effects without modifying the data. Effect is for operations that need to happen alongside your main processing flow, such as logging, metrics collection, notifications, or audit trails.
The function receives the data for inspection but must not modify it. Any returned error stops the pipeline immediately. The original data always passes through unchanged, making Effect perfect for:
- Logging important events or data states
- Recording metrics (counts, latencies, values)
- Sending notifications or alerts
- Writing audit logs for compliance
- Triggering external systems
- Validating without transformation
Unlike Apply, Effect cannot transform data. Unlike Transform, it can fail. This separation ensures side effects are explicit and testable.
Example:
var AuditPaymentID = pipz.NewIdentity("audit-payment", "Logs payment to audit trail")
auditLog := pipz.Effect(AuditPaymentID, func(ctx context.Context, payment Payment) error {
return auditLogger.Log(ctx, "payment_processed", map[string]any{
"amount": payment.Amount,
"user_id": payment.UserID,
"timestamp": time.Now(),
})
})
func Enrich ¶
Enrich creates a Processor that attempts to enhance data with additional information. Enrich is unique among processors - if the enrichment fails, it returns the original data unchanged rather than stopping the pipeline. This makes it ideal for optional enhancements that improve data quality but aren't critical for processing.
The enrichment function should fetch additional data and return an enhanced version. Common enrichment patterns include:
- Adding user details from a cache or database
- Geocoding addresses to add coordinates
- Fetching current prices or exchange rates
- Looking up metadata from external services
- Adding computed fields from external data
Use Enrich when the additional data is "nice to have" but not required. If the enrichment is mandatory, use Apply instead. Enrich swallows errors to ensure pipeline continuity, so consider logging failures within the enrichment function.
Example:
var AddCustomerNameID = pipz.NewIdentity("add-customer-name", "Enriches order with customer name from CRM")
addCustomerName := pipz.Enrich(AddCustomerNameID, func(ctx context.Context, order Order) (Order, error) {
customer, err := customerService.Get(ctx, order.CustomerID)
if err != nil {
// Log but don't fail - order processing continues without name
log.Printf("failed to enrich customer data: %v", err)
return order, err
}
order.CustomerName = customer.Name
return order, nil
})
func Mutate ¶
func Mutate[T any](identity Identity, transformer func(context.Context, T) T, condition func(context.Context, T) bool) Processor[T]
Mutate creates a Processor that conditionally transforms data based on a predicate. Mutate combines a condition check with a transformation, applying the transformer only when the condition returns true. When false, data passes through unchanged.
This pattern is cleaner than embedding if-statements in Transform functions and makes the condition explicit and testable. Use Mutate for:
- Feature flags (transform only for enabled users)
- A/B testing (apply changes to test group)
- Conditional formatting based on data values
- Environment-specific transformations
- Business rules that apply to subset of data
The condition and transformer are separate functions for better testability and reusability. The transformer cannot fail - use Apply with conditional logic if you need error handling.
Example:
var PremiumDiscountID = pipz.NewIdentity("premium-discount", "Applies 10% discount for premium customers")
discountPremium := pipz.Mutate(PremiumDiscountID,
func(ctx context.Context, order Order) Order {
order.Total *= 0.9 // 10% discount
return order
},
func(ctx context.Context, order Order) bool {
return order.CustomerTier == "premium" && order.Total > 100
},
)
func Transform ¶
Transform creates a Processor that applies a pure transformation function to data. Transform is the simplest processor - use it when your operation always succeeds and always modifies the data in a predictable way.
The transformation function cannot fail, making Transform ideal for:
- Data formatting (uppercase, trimming, parsing that can't fail)
- Mathematical calculations that can't error
- Field mapping or restructuring
- Adding computed fields
If your transformation might fail (e.g., parsing, validation), use Apply instead. If you need conditional transformation, use Mutate.
Example:
var UppercaseID = pipz.NewIdentity("uppercase", "Converts text to uppercase")
uppercase := pipz.Transform(UppercaseID, func(ctx context.Context, s string) string {
return strings.ToUpper(s)
})
func (Processor[T]) Identity ¶
Identity returns the identity of the processor for debugging and error reporting.
func (Processor[T]) Process ¶
Process implements the Chainable interface, allowing individual processors to be used directly or composed in connectors.
This means a single Processor can be used anywhere a Chainable is expected:
validator := pipz.Effect(ValidateID, validateFunc) // Can be used directly result, err := validator.Process(ctx, data) // Or in connectors pipeline := pipz.NewSequence(PipelineID, validator, transformer)
type Race ¶
type Race[T Cloner[T]] struct { // contains filtered or unexported fields }
Race runs all processors in parallel and returns the result of the first to complete successfully. Race implements competitive processing where speed matters more than which specific processor succeeds. The first successful result wins and cancels all other processors.
Context handling: Race uses context.WithCancel(ctx) to create a derived context that preserves all parent context values (including trace IDs) while allowing cancellation of losing processors when a winner is found.
The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.
This pattern excels when you have multiple ways to get the same result and want the fastest one:
- Querying multiple replicas or regions
- Trying different algorithms with varying performance
- Fetching from multiple caches
- Calling primary and backup services simultaneously
- Any scenario where latency matters more than specific source
Key behaviors:
- First success wins and cancels others
- All failures returns the last error
- Each processor gets an isolated copy via Clone()
- Useful for reducing p99 latencies
- Can increase load (all processors run)
Example:
// UserQuery must implement Cloner[UserQuery]
race := pipz.NewRace(
fetchFromLocalCache,
fetchFromRegionalCache,
fetchFromDatabase,
)
func (*Race[T]) Close ¶
Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.
func (*Race[T]) SetProcessors ¶
SetProcessors replaces all processors atomically.
type RaceFlow ¶
type RaceFlow struct {
Competitors []Node `json:"competitors"`
}
RaceFlow represents parallel execution where first success wins. All competitors start simultaneously; first to succeed cancels others.
type RateLimiter ¶
type RateLimiter[T any] struct { // contains filtered or unexported fields }
RateLimiter controls the rate of processing to protect downstream services. RateLimiter wraps a processor and uses a token bucket algorithm to enforce rate limits, allowing controlled bursts while maintaining a steady average rate. This is essential for protecting external APIs, databases, and other rate-sensitive resources.
CRITICAL: RateLimiter is a STATEFUL connector that maintains an internal token bucket. Create it once and reuse it - do NOT create a new RateLimiter for each request, as that would reset the token bucket and rate limiting would not work.
❌ WRONG - Creating per request (useless):
func handleRequest(req Request) Response {
limiter := pipz.NewRateLimiter(LimiterID, 100, 10, apiCall) // NEW limiter each time!
return limiter.Process(ctx, req) // Always allows through
}
✅ RIGHT - Create once, reuse:
var apiLimiter = pipz.NewRateLimiter(LimiterID, 100, 10, apiCall) // Shared instance
func handleRequest(req Request) Response {
return apiLimiter.Process(ctx, req) // Actually rate limits
}
The limiter operates in two modes:
- "wait": Blocks until a token is available (default)
- "drop": Returns an error immediately if no tokens available
RateLimiter is particularly useful for:
- API client implementations with rate limits
- Database connection throttling
- Preventing overwhelming downstream services
- Implementing fair resource sharing
- Meeting SLA requirements
Best Practices:
- Use const names for all processors/connectors (see best-practices.md)
- Create RateLimiters once and reuse them (e.g., as struct fields or package variables)
- Configure limits based on actual downstream capacity
- Layer multiple limiters for complex scenarios (global → service → endpoint)
Example:
var (
APILimiterID = pipz.NewIdentity("api-limiter", "Rate limits Stripe API calls")
ChargeID = pipz.NewIdentity("charge", "Process payment charge")
)
// Create limiter wrapping the API call
var stripeLimiter = pipz.NewRateLimiter(APILimiterID, 100, 10,
pipz.Apply(ChargeID, processStripeCharge),
)
// Use in pipeline
func createPaymentPipeline() pipz.Chainable[Payment] {
return pipz.NewSequence(PipelineID,
validatePayment,
stripeLimiter, // Rate-limited API call
confirmPayment,
)
}
func NewRateLimiter ¶
func NewRateLimiter[T any](identity Identity, ratePerSecond float64, burst int, processor Chainable[T]) *RateLimiter[T]
NewRateLimiter creates a new RateLimiter connector wrapping the given processor. The ratePerSecond parameter sets the sustained rate limit. The burst parameter sets the maximum burst size.
func (*RateLimiter[T]) Close ¶
func (r *RateLimiter[T]) Close() error
Close gracefully shuts down the connector and its wrapped processor. Close is idempotent - multiple calls return the same result.
func (*RateLimiter[T]) GetAvailableTokens ¶
func (r *RateLimiter[T]) GetAvailableTokens() float64
GetAvailableTokens returns the current number of available tokens. This method is primarily intended for testing and debugging.
func (*RateLimiter[T]) GetBurst ¶
func (r *RateLimiter[T]) GetBurst() int
GetBurst returns the current burst capacity.
func (*RateLimiter[T]) GetMode ¶
func (r *RateLimiter[T]) GetMode() string
GetMode returns the current mode ("wait" or "drop").
func (*RateLimiter[T]) GetRate ¶
func (r *RateLimiter[T]) GetRate() float64
GetRate returns the current rate limit.
func (*RateLimiter[T]) Identity ¶
func (r *RateLimiter[T]) Identity() Identity
Identity returns the identity of this connector.
func (*RateLimiter[T]) Process ¶
func (r *RateLimiter[T]) Process(ctx context.Context, data T) (result T, err error)
Process implements the Chainable interface.
func (*RateLimiter[T]) Schema ¶
func (r *RateLimiter[T]) Schema() Node
Schema returns a Node representing this connector in the pipeline schema.
func (*RateLimiter[T]) SetBurst ¶
func (r *RateLimiter[T]) SetBurst(burst int) *RateLimiter[T]
SetBurst updates the burst capacity.
func (*RateLimiter[T]) SetMode ¶
func (r *RateLimiter[T]) SetMode(mode string) *RateLimiter[T]
SetMode sets the rate limiting mode ("wait" or "drop").
func (*RateLimiter[T]) SetRate ¶
func (r *RateLimiter[T]) SetRate(ratePerSecond float64) *RateLimiter[T]
SetRate updates the rate limit (requests per second).
func (*RateLimiter[T]) WithClock ¶
func (r *RateLimiter[T]) WithClock(clock clockz.Clock) *RateLimiter[T]
WithClock sets the clock implementation for testing purposes. This method is primarily intended for testing with FakeClock.
type RateLimiterFlow ¶
type RateLimiterFlow struct {
Processor Node `json:"processor"`
}
RateLimiterFlow represents processing with rate limiting.
func (RateLimiterFlow) Variant ¶
func (RateLimiterFlow) Variant() FlowVariant
Variant implements Flow.
type Retry ¶
type Retry[T any] struct { // contains filtered or unexported fields }
Retry attempts the processor up to maxAttempts times. Retry provides simple retry logic for operations that may fail transiently. It immediately retries on failure without delay, making it suitable for quick operations or when failures are expected to clear immediately.
Each retry uses the same input data. Context cancellation is checked between attempts to allow for early termination. If all attempts fail, the last error is returned with attempt count information for debugging.
Use Retry for:
- Network calls with transient failures
- Database operations during brief contentions
- File operations with temporary locks
- Any operation with intermittent failures
For operations needing delay between retries, use RetryWithBackoff. For trying different approaches, use Fallback instead.
Example:
var RetryID = pipz.NewIdentity("retry-db", "Retries database write")
retry := pipz.NewRetry(
RetryID,
databaseWriter,
3, // Try up to 3 times
)
func (*Retry[T]) Close ¶
Close gracefully shuts down the connector and its child processor. Close is idempotent - multiple calls return the same result.
func (*Retry[T]) GetMaxAttempts ¶
GetMaxAttempts returns the current maximum attempts setting.
func (*Retry[T]) SetMaxAttempts ¶
SetMaxAttempts updates the maximum number of retry attempts.
type RetryFlow ¶
type RetryFlow struct {
Processor Node `json:"processor"`
}
RetryFlow represents processing with simple retry logic.
type Scaffold ¶
type Scaffold[T Cloner[T]] struct { // contains filtered or unexported fields }
Scaffold runs all processors in parallel with context isolation for true fire-and-forget behavior. Unlike Concurrent, Scaffold uses context.WithoutCancel to ensure processors continue running even if the parent context is canceled. This is ideal for operations that must complete regardless of the main pipeline's state.
The input type T must implement the Cloner[T] interface to provide efficient, type-safe copying without reflection. This ensures predictable performance and allows types to control their own copying semantics.
Use Scaffold when you need:
- True fire-and-forget operations that outlive the request
- Background tasks that shouldn't be canceled with the main flow
- Cleanup or logging operations that must complete
- Non-critical side effects that shouldn't block the pipeline
Common use cases:
- Asynchronous audit logging
- Background cache warming
- Non-critical notifications
- Metrics collection
- Cleanup tasks that should complete independently
Important characteristics:
- Input type must implement Cloner[T] interface
- Processors continue even after parent context cancellation
- Returns immediately without waiting for completion
- Original input always returned unchanged
- No error reporting from background processors
- Trace context is preserved (but cancellation is not)
Example:
scaffold := pipz.NewScaffold(
"async-operations",
asyncAuditLog,
warmCache,
collectMetrics,
)
// Returns immediately, processors run in background
result, err := scaffold.Process(ctx, order)
func NewScaffold ¶
NewScaffold creates a new Scaffold connector.
func (*Scaffold[T]) Close ¶
Close gracefully shuts down the connector and all its child processors. Close is idempotent - multiple calls return the same result.
func (*Scaffold[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Scaffold[T]) SetProcessors ¶
SetProcessors replaces all processors atomically.
type ScaffoldFlow ¶
type ScaffoldFlow struct {
Processors []Node `json:"processors"`
}
ScaffoldFlow represents fire-and-forget parallel execution. Processors run asynchronously without blocking the main flow.
type Schema ¶
type Schema struct {
Root Node `json:"root"`
}
Schema represents a complete pipeline schema. It wraps the root Node and provides utilities for traversal and serialization.
func (Schema) FindByName ¶
FindByName returns the first node with the given name, or nil if not found.
func (Schema) FindByType ¶
FindByType returns all nodes of the given type.
type Sequence ¶
type Sequence[T any] struct { // contains filtered or unexported fields }
Sequence provides a type-safe sequence for processing values of type T. It maintains an ordered list of processors that are executed sequentially.
Sequence offers a rich API with methods to dynamically modify the processor chain. This makes it ideal for scenarios where the processing steps need to be configured at runtime or modified based on conditions.
Key features:
- Thread-safe for concurrent access
- Dynamic modification of processor chain
- Identity-based processors for debugging and visualization
- Rich API for reordering and modification
- Fail-fast execution with detailed errors
Sequence is the primary way to chain processors together.
func NewSequence ¶
NewSequence creates a new Sequence with optional initial processors. The sequence is ready to use immediately and can be safely accessed concurrently. Additional processors can be added using Register or the various modification methods.
Example:
// Single line declaration
var (
UserProcessingID = pipz.NewIdentity("user-processing", "Main user processing pipeline")
ValidateID = pipz.NewIdentity("validate", "Validates user input")
EnrichID = pipz.NewIdentity("enrich", "Enriches user with external data")
AuditID = pipz.NewIdentity("audit", "Logs user actions for audit")
)
sequence := pipz.NewSequence(UserProcessingID,
pipz.Effect(ValidateID, validateUser),
pipz.Apply(EnrichID, enrichUser),
pipz.Effect(AuditID, auditUser),
)
// Or create empty and add later
sequence := pipz.NewSequence[User](UserProcessingID)
sequence.Register(validateUser, enrichUser)
func (*Sequence[T]) After ¶
After inserts processors after the first processor with the specified identity.
func (*Sequence[T]) Before ¶
Before inserts processors before the first processor with the specified identity.
func (*Sequence[T]) Clear ¶
func (c *Sequence[T]) Clear()
Clear removes all processors from the Sequence.
func (*Sequence[T]) Close ¶
Close gracefully shuts down the connector and all its child processors. Processors are closed in reverse order (LIFO) to mirror typical resource cleanup patterns. Close is idempotent - multiple calls return the same result.
func (*Sequence[T]) Process ¶
Process executes all registered processors on the input value. Each processor receives the output of the previous processor. The context is checked before each processor execution - if the context is canceled or expired, processing stops immediately. If any processor returns an error, execution stops and a Error is returned with rich debugging information.
Process is thread-safe and can be called concurrently. The sequence's processor list is locked during execution to prevent modifications.
Error handling includes:
- Processor name and stage index for debugging
- Original input data that caused the failure
- Execution duration for performance analysis
- Timeout/cancellation detection
Context best practices:
- Always use context with timeout for production
- Check ctx.Err() in long-running processors
- Pass context through to external calls
func (*Sequence[T]) Register ¶
Register adds processors to this Sequence. Processors are executed in the order they are registered.
This method is thread-safe and can be called concurrently. New processors are appended to the existing chain, making Register ideal for building sequences incrementally:
sequence := pipz.NewSequence[Order]("order-processing")
sequence.Register(validateOrder)
sequence.Register(calculateTax, applyDiscount)
if config.RequiresApproval {
sequence.Register(requireApproval)
}
func (*Sequence[T]) Schema ¶
Schema returns a Node representing this sequence in the pipeline schema.
type SequenceFlow ¶
type SequenceFlow struct {
Steps []Node `json:"steps"`
}
SequenceFlow represents an ordered sequence of processing steps. Each step is executed in order, with the output of each step becoming the input of the next.
type Switch ¶
type Switch[T any] struct { // contains filtered or unexported fields }
Switch routes to different processors based on condition result. Switch enables conditional processing where the path taken depends on the input data. The condition function examines the data and returns a route key that determines which processor to use.
If no route exists for the returned key, the input passes through unchanged.
Switch is perfect for:
- Status-based workflows with defined states
- Region-specific logic
- Priority handling
- A/B testing with experiment names
- Dynamic routing tables that change at runtime
- Feature flag controlled processing paths
Example:
const (
RouteStandard = "standard"
RouteHighValue = "high_value"
RouteCrypto = "crypto"
)
router := pipz.NewSwitch(SwitchID,
func(ctx context.Context, p Payment) string {
if p.Amount > 10000 {
return RouteHighValue
} else if p.Method == "crypto" {
return RouteCrypto
}
return RouteStandard
},
)
router.AddRoute(RouteStandard, standardProcessor)
router.AddRoute(RouteHighValue, highValueProcessor)
router.AddRoute(RouteCrypto, cryptoProcessor)
func (*Switch[T]) ClearRoutes ¶
ClearRoutes removes all routes from the switch.
func (*Switch[T]) Close ¶
Close gracefully shuts down the connector and all its route processors. Close is idempotent - multiple calls return the same result.
func (*Switch[T]) Process ¶
Process implements the Chainable interface. If no route matches the condition result, the input is returned unchanged.
func (*Switch[T]) RemoveRoute ¶
RemoveRoute removes a route from the switch.
func (*Switch[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Switch[T]) SetCondition ¶
SetCondition updates the condition function.
type SwitchFlow ¶
SwitchFlow represents conditional routing to different processors. The condition determines which route key to use.
type Timeout ¶
type Timeout[T any] struct { // contains filtered or unexported fields }
Timeout enforces a timeout on the processor's execution. Timeout wraps any processor with a hard time limit, ensuring operations complete within acceptable bounds. If the timeout expires, the operation is canceled via context and a timeout error is returned.
This connector is critical for:
- Preventing hung operations
- Meeting SLA requirements
- Protecting against slow external services
- Ensuring predictable system behavior
- Resource management in concurrent systems
The wrapped operation should respect context cancellation for immediate termination. Operations that ignore context may continue running in the background even after timeout.
Timeout is often combined with Retry for robust error handling:
pipz.NewRetry(RetryID, pipz.NewTimeout(TimeoutID, operation, 5*time.Second), 3)
Example:
var TimeoutID = pipz.NewIdentity("user-service-timeout", "Enforces 2s timeout")
timeout := pipz.NewTimeout(
TimeoutID,
userServiceCall,
2*time.Second, // Must complete within 2 seconds
)
func NewTimeout ¶
func NewTimeout[T any](identity Identity, processor Chainable[T], duration time.Duration) *Timeout[T]
NewTimeout creates a new Timeout connector.
func (*Timeout[T]) Close ¶
Close gracefully shuts down the timeout connector and its child processor. Close is idempotent - multiple calls return the same result.
func (*Timeout[T]) GetDuration ¶
GetDuration returns the current timeout duration.
func (*Timeout[T]) Schema ¶
Schema returns a Node representing this connector in the pipeline schema.
func (*Timeout[T]) SetDuration ¶
SetDuration updates the timeout duration.
type TimeoutFlow ¶
type TimeoutFlow struct {
Processor Node `json:"processor"`
}
TimeoutFlow represents processing with a timeout constraint.
type WorkerPool ¶
type WorkerPool[T Cloner[T]] struct { // contains filtered or unexported fields }
WorkerPool provides bounded parallel execution with a fixed number of workers. Uses semaphore pattern to limit concurrent processor execution while maintaining the same API and behavior as other connectors in the pipz ecosystem.
The input type T must implement Cloner[T] to provide safe concurrent processing. Each processor receives an isolated copy of the input data.
Example:
pool := pipz.NewWorkerPool("api-calls", 5,
pipz.Apply("service-a", callServiceA),
pipz.Apply("service-b", callServiceB),
pipz.Apply("service-c", callServiceC),
)
func NewWorkerPool ¶
func NewWorkerPool[T Cloner[T]](identity Identity, workers int, processors ...Chainable[T]) *WorkerPool[T]
NewWorkerPool creates a WorkerPool with specified worker count. Workers parameter controls maximum concurrent processors (semaphore slots).
func (*WorkerPool[T]) Add ¶
func (w *WorkerPool[T]) Add(processor Chainable[T]) *WorkerPool[T]
Add appends a processor to the worker pool execution list.
func (*WorkerPool[T]) Clear ¶
func (w *WorkerPool[T]) Clear() *WorkerPool[T]
Clear removes all processors from the worker pool execution list.
func (*WorkerPool[T]) Close ¶
func (w *WorkerPool[T]) Close() error
Close gracefully shuts down the worker pool and all its child processors. Close is idempotent - multiple calls return the same result.
func (*WorkerPool[T]) GetActiveWorkers ¶
func (w *WorkerPool[T]) GetActiveWorkers() int
GetActiveWorkers returns the number of currently active workers.
func (*WorkerPool[T]) GetWorkerCount ¶
func (w *WorkerPool[T]) GetWorkerCount() int
GetWorkerCount returns the maximum number of concurrent workers.
func (*WorkerPool[T]) Identity ¶
func (w *WorkerPool[T]) Identity() Identity
Identity returns the identity of this connector.
func (*WorkerPool[T]) Process ¶
func (w *WorkerPool[T]) Process(ctx context.Context, input T) (result T, err error)
Process implements the Chainable interface.
func (*WorkerPool[T]) Remove ¶
func (w *WorkerPool[T]) Remove(index int) error
Remove removes the processor at the specified index.
func (*WorkerPool[T]) Schema ¶
func (w *WorkerPool[T]) Schema() Node
Schema returns a Node representing this connector in the pipeline schema.
func (*WorkerPool[T]) SetProcessors ¶
func (w *WorkerPool[T]) SetProcessors(processors ...Chainable[T]) *WorkerPool[T]
SetProcessors replaces all processors atomically.
func (*WorkerPool[T]) SetWorkerCount ¶
func (w *WorkerPool[T]) SetWorkerCount(workers int) *WorkerPool[T]
SetWorkerCount adjusts the worker pool size by recreating the semaphore.
func (*WorkerPool[T]) WithClock ¶
func (w *WorkerPool[T]) WithClock(clock clockz.Clock) *WorkerPool[T]
WithClock sets a custom clock for testing.
func (*WorkerPool[T]) WithTimeout ¶
func (w *WorkerPool[T]) WithTimeout(timeout time.Duration) *WorkerPool[T]
WithTimeout sets per-task timeout. Each processor must complete within this duration.
type WorkerpoolFlow ¶
type WorkerpoolFlow struct {
Processors []Node `json:"processors"`
}
WorkerpoolFlow represents processing distributed across a worker pool.
func (WorkerpoolFlow) Variant ¶
func (WorkerpoolFlow) Variant() FlowVariant
Variant implements Flow.