streamz

package module
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2026 License: MIT Imports: 12 Imported by: 0

README

streamz

CI Status codecov Go Report Card CodeQL Go Reference License Go Version Release

Type-safe stream processing primitives for Go channels.

Build composable pipelines from simple parts — batching, windowing, flow control — with unified error handling and deterministic testing.

One Channel, Every Pattern

Every processor shares the same signature:

Process(ctx context.Context, in <-chan Result[T]) <-chan Result[Out]

Result[T] unifies success and error in a single channel — no dual-channel complexity:

// Filter keeps items matching a predicate
filter := streamz.NewFilter(func(o Order) bool { return o.Total > 0 })

// Mapper transforms items
mapper := streamz.NewMapper(func(o Order) Order {
    o.ProcessedAt = time.Now()
    return o
})

// Batcher collects items by size or time
batcher := streamz.NewBatcher[Order](streamz.BatchConfig{
    MaxSize:    100,
    MaxLatency: time.Second,
})

Compose them into pipelines — each output feeds the next input:

filtered := filter.Process(ctx, orders)
mapped := mapper.Process(ctx, filtered)
batched := batcher.Process(ctx, mapped)

for batch := range batched {
    if batch.IsSuccess() {
        bulkInsert(batch.Value())
    }
}

One interface. One channel. Every pattern.

Install

go get github.com/zoobz-io/streamz

Requires Go 1.24+.

Quick Start

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/zoobz-io/streamz"
)

type Order struct {
    ID          string
    Total       float64
    ProcessedAt time.Time
}

func main() {
    ctx := context.Background()

    // Source channel
    orders := make(chan streamz.Result[Order], 10)
    go func() {
        defer close(orders)
        orders <- streamz.Success(Order{ID: "A", Total: 99.99})
        orders <- streamz.Success(Order{ID: "B", Total: 149.99})
        orders <- streamz.Success(Order{ID: "C", Total: 0}) // will be filtered
    }()

    // Build processors
    filter := streamz.NewFilter(func(o Order) bool { return o.Total > 0 })
    mapper := streamz.NewMapper(func(o Order) Order {
        o.ProcessedAt = time.Now()
        return o
    })

    // Compose pipeline
    filtered := filter.Process(ctx, orders)
    processed := mapper.Process(ctx, filtered)

    // Consume results
    for result := range processed {
        if result.IsSuccess() {
            o := result.Value()
            fmt.Printf("Processed: %s ($%.2f) at %v\n", o.ID, o.Total, o.ProcessedAt)
        }
    }
}

Capabilities

Feature Description Docs
Result[T] Pattern Unified success/error handling in a single channel Concepts
Processor Interface Common pattern for all stream operations Processors
Batching & Windowing Time and size-based aggregation Architecture
Flow Control Throttle, debounce, buffer, backpressure Backpressure
Deterministic Testing Clock abstraction for reproducible tests Testing
Error Handling Skip, retry, dead letter queues Error Handling

Why streamz?

  • Type-safe — Full compile-time checking with Go generics
  • Composable — Complex pipelines from simple, reusable parts
  • Unified errorsResult[T] eliminates dual-channel complexity
  • Deterministic testing — Clock abstraction enables reproducible time-based tests
  • Production ready — Proper channel lifecycle, no goroutine leaks
  • Minimal dependencies — Standard library plus clockz

Composable Stream Architecture

streamz enables a pattern: define processors once, compose them into any pipeline.

Your stream operations become reusable building blocks. Validation, enrichment, batching, rate limiting — each is a processor. Combine them in different configurations for different use cases.

// Reusable processors
validate := streamz.NewFilter(isValid)
enrich := streamz.NewAsyncMapper(fetchMetadata).WithWorkers(10)
batch := streamz.NewBatcher[Order](streamz.BatchConfig{MaxSize: 100})
throttle := streamz.NewThrottle[[]Order](100 * time.Millisecond)

// Real-time pipeline
realtime := mapper.Process(ctx, filter.Process(ctx, orders))

// Batch pipeline with rate limiting
batched := throttle.Process(ctx, batch.Process(ctx, enrich.Process(ctx, validate.Process(ctx, orders))))

Time-dependent processors use clockz for deterministic testing — advance time explicitly, verify behavior reproducibly.

Documentation

Full documentation is available in the docs/ directory:

Learn
Guides
Cookbook
  • Recipes — Complete examples for common scenarios
Reference

Contributing

See CONTRIBUTING.md for guidelines. Run make help for available commands.

License

MIT License — see LICENSE for details.

Documentation

Overview

Package streamz provides type-safe, composable stream processing primitives that work with Go channels, enabling real-time data processing through batching, windowing, and other streaming operations.

The core abstraction uses the Result[T] pattern which unifies success and error cases into a single channel. This eliminates dual-channel complexity while providing explicit error handling and better monitoring capabilities.

Basic usage:

ctx := context.Background()
source := make(chan Result[int])

// Create a simple processing pipeline
fanin := streamz.NewFanIn[int]()

// Process returns a single Result[T] channel
results := fanin.Process(ctx, source)

// Handle both success and error cases from single channel
for result := range results {
	if result.IsError() {
		log.Printf("Processing error: %v", result.Error())
	} else {
		fmt.Printf("Got item: %v\n", result.Value())
	}
}

The package provides various processors for common streaming patterns:

  • Batching and unbatching
  • Windowing (tumbling, sliding, session)
  • Buffering with different strategies
  • Filtering and mapping
  • Fan-in and fan-out
  • Rate limiting and flow control
  • Deduplication
  • Monitoring and observability

Package streamz provides streaming data processing capabilities, including time-related operations for deterministic testing.

Index

Examples

Constants

View Source
const (
	MetadataPartitionIndex    = "partition_index"    // int - target partition [0, N)
	MetadataPartitionTotal    = "partition_total"    // int - total partition count N
	MetadataPartitionStrategy = "partition_strategy" // string - "hash", "round_robin", or "error"
)

Standard partition metadata keys for tracing and debugging.

View Source
const (
	MetadataWindowStart = "window_start" // time.Time - window start time
	MetadataWindowEnd   = "window_end"   // time.Time - window end time
	MetadataWindowType  = "window_type"  // string - "tumbling", "sliding", "session"
	MetadataWindowSize  = "window_size"  // time.Duration - window duration
	MetadataWindowSlide = "window_slide" // time.Duration - slide interval (sliding only)
	MetadataWindowGap   = "window_gap"   // time.Duration - activity gap (session only)
	MetadataSessionKey  = "session_key"  // string - session identifier (session only)
	MetadataSource      = "source"       // string - data source identifier
	MetadataTimestamp   = "timestamp"    // time.Time - processing timestamp
	MetadataProcessor   = "processor"    // string - processor that added metadata
	MetadataRetryCount  = "retry_count"  // int - number of retries attempted
	MetadataSessionID   = "session_id"   // string - session identifier
)

Standard metadata keys for common use cases.

Variables

This section is empty.

Functions

func IsInWindow

func IsInWindow[T any](result Result[T], timestamp time.Time) (bool, error)

IsInWindow checks if a timestamp falls within the Result's window.

func WindowDuration

func WindowDuration[T any](result Result[T]) (time.Duration, error)

WindowDuration returns the actual window duration.

Types

type AsyncMapper

type AsyncMapper[In, Out any] struct {
	// contains filtered or unexported fields
}

AsyncMapper processes items concurrently using multiple worker goroutines. It supports both ordered processing (preserving input sequence) and unordered processing (emitting results as they complete). This enables parallelization of CPU-intensive or I/O-bound operations while maintaining flexibility in ordering requirements.

Example (Ordered)

Example demonstrates ordered concurrent processing.

ctx := context.Background()

// Simulate API enrichment with preserved order
type User struct {
	ID   int
	Name string
}

type EnrichedUser struct {
	ID      int
	Name    string
	Profile string
}

// Create ordered async mapper
enricher := NewAsyncMapper(func(_ context.Context, u User) (EnrichedUser, error) {
	// Simulate API call - processing times vary but order is preserved
	return EnrichedUser{
		ID:      u.ID,
		Name:    u.Name,
		Profile: fmt.Sprintf("Profile for %s", u.Name),
	}, nil
}).WithWorkers(3).WithOrdered(true)

// Create input stream
users := make(chan Result[User])
go func() {
	defer close(users)
	users <- NewSuccess(User{ID: 1, Name: "Alice"})
	users <- NewSuccess(User{ID: 2, Name: "Bob"})
	users <- NewSuccess(User{ID: 3, Name: "Carol"})
}()

// Process with preserved order
enriched := enricher.Process(ctx, users)
for result := range enriched {
	if result.IsError() {
		fmt.Printf("Error: %v\n", result.Error())
	} else {
		user := result.Value()
		fmt.Printf("User %d: %s - %s\n", user.ID, user.Name, user.Profile)
	}
}
Output:
User 1: Alice - Profile for Alice
User 2: Bob - Profile for Bob
User 3: Carol - Profile for Carol
Example (Unordered)

Example demonstrates unordered concurrent processing for maximum throughput.

ctx := context.Background()

// Create unordered async mapper for maximum throughput
processor := NewAsyncMapper(func(_ context.Context, i int) (int, error) {
	// Simulate CPU-intensive work
	return i * i, nil
}).WithWorkers(4).WithOrdered(false)

// Create input stream
numbers := make(chan Result[int])
go func() {
	defer close(numbers)
	for i := 1; i <= 5; i++ {
		numbers <- NewSuccess(i)
	}
}()

// Process without order preservation
squares := processor.Process(ctx, numbers)
results := make([]int, 0, 5)
for result := range squares {
	if result.IsError() {
		fmt.Printf("Error: %v\n", result.Error())
	} else {
		results = append(results, result.Value())
	}
}

// Sort for consistent output (order may vary in real usage)
sort.Ints(results)
fmt.Printf("Squares: %v\n", results)
Output:
Squares: [1 4 9 16 25]

func NewAsyncMapper

func NewAsyncMapper[In, Out any](fn func(context.Context, In) (Out, error)) *AsyncMapper[In, Out]

NewAsyncMapper creates a processor that executes transformations concurrently. By default, it preserves input order and uses runtime.NumCPU() workers. Use the fluent API to configure behavior like worker count and ordering.

When to use:

  • CPU-intensive transformations (image processing, encryption)
  • I/O-bound operations (API calls, database queries)
  • Parallel enrichment while optionally maintaining sequence
  • Speeding up independent transformations
  • Rate-limited API calls with concurrent workers

Example:

// Parallel API enrichment with preserved order
enricher := streamz.NewAsyncMapper(func(ctx context.Context, id string) (User, error) {
	// Each API call happens in parallel
	return fetchUserFromAPI(ctx, id)
})

// Custom worker count for rate-limited APIs
enricher := streamz.NewAsyncMapper(func(ctx context.Context, id string) (User, error) {
	return fetchUserFromAPI(ctx, id)
}).WithWorkers(10)

// Unordered processing for maximum throughput
processor := streamz.NewAsyncMapper(func(ctx context.Context, img Image) (Thumbnail, error) {
	return generateThumbnail(ctx, img)
}).WithOrdered(false).WithWorkers(runtime.NumCPU())

results := processor.Process(ctx, input)
for result := range results {
	if result.IsError() {
		log.Printf("Processing error: %v", result.Error())
	} else {
		fmt.Printf("Result: %+v\n", result.Value())
	}
}

Parameters:

  • fn: Transformation function that can be safely executed concurrently

Returns a new AsyncMapper processor with fluent configuration.

func (*AsyncMapper[In, Out]) Name

func (a *AsyncMapper[In, Out]) Name() string

Name returns the processor name for debugging and monitoring.

func (*AsyncMapper[In, Out]) Process

func (a *AsyncMapper[In, Out]) Process(ctx context.Context, in <-chan Result[In]) <-chan Result[Out]

Process transforms input items concurrently across multiple workers. In ordered mode, output maintains input sequence despite variable processing times. In unordered mode, results are emitted as they complete for maximum throughput. Errors are wrapped in StreamError with original item context.

func (*AsyncMapper[In, Out]) WithBufferSize

func (a *AsyncMapper[In, Out]) WithBufferSize(size int) *AsyncMapper[In, Out]

WithBufferSize sets the reorder buffer size for ordered processing. This controls memory usage when processing times vary significantly. Only affects ordered mode. Defaults to 100.

func (*AsyncMapper[In, Out]) WithName

func (a *AsyncMapper[In, Out]) WithName(name string) *AsyncMapper[In, Out]

WithName sets a custom name for this processor. If not set, defaults to "async-mapper".

func (*AsyncMapper[In, Out]) WithOrdered

func (a *AsyncMapper[In, Out]) WithOrdered(ordered bool) *AsyncMapper[In, Out]

WithOrdered controls whether output preserves input order. If ordered=true (default), output items maintain their input sequence despite variable processing times. If ordered=false, results are emitted as they complete.

func (*AsyncMapper[In, Out]) WithWorkers

func (a *AsyncMapper[In, Out]) WithWorkers(workers int) *AsyncMapper[In, Out]

WithWorkers sets the number of concurrent workers. If not set, defaults to runtime.NumCPU().

type BatchConfig

type BatchConfig struct {
	// MaxLatency is the maximum time to wait before emitting a partial batch.
	// If set, a batch will be emitted after this duration even if it's not full.
	MaxLatency time.Duration

	// MaxSize is the maximum number of items in a batch.
	// A batch is emitted immediately when it reaches this size.
	MaxSize int
}

BatchConfig configures batching behavior for the Batcher processor.

type Batcher

type Batcher[T any] struct {
	// contains filtered or unexported fields
}

Batcher collects items from a stream and groups them into batches based on size or time constraints. It emits a batch when either the maximum size is reached or the maximum latency expires, whichever comes first. This is useful for optimizing downstream operations that work more efficiently with groups of items rather than individual items.

Batcher handles errors by separating them from successful batches. Errors are passed through immediately without being included in batches. This ensures error visibility while maintaining batch integrity.

func NewBatcher

func NewBatcher[T any](config BatchConfig, clock Clock) *Batcher[T]

NewBatcher creates a processor that intelligently groups items into batches. Batches are emitted when either the size limit is reached OR the time limit expires, whichever comes first. This dual-trigger approach balances throughput with latency.

Key behaviors:

  • Errors are passed through immediately without affecting batches
  • Successful items are batched according to MaxSize and MaxLatency constraints
  • Timeouts are treated as normal batch emissions (not errors)
  • Memory usage is bounded by MaxSize configuration
  • Single-goroutine pattern prevents race conditions

When to use:

  • Optimizing database writes with bulk operations
  • Reducing API calls by batching requests
  • Implementing micro-batching for stream processing
  • Buffering events for periodic processing
  • Cost optimization through batch operations

Example:

// Batch up to 1000 items or 5 seconds, whichever comes first
batcher := streamz.NewBatcher[Event](streamz.BatchConfig{
	MaxSize:    1000,
	MaxLatency: 5 * time.Second,
}, streamz.RealClock)

batches := batcher.Process(ctx, events)
for result := range batches {
	if result.IsError() {
		log.Printf("Individual item error: %v", result.Error())
		continue
	}
	batch := result.Value()
	// Process batch of up to 1000 items
	// Never waits more than 5 seconds
	bulkInsert(batch)
}

// Optimize API calls with smart batching
apiBatcher := streamz.NewBatcher[Request](streamz.BatchConfig{
	MaxSize:    100,  // API limit
	MaxLatency: 100 * time.Millisecond, // Max acceptable delay
}, streamz.RealClock)

Parameters:

  • config: Batch configuration with size and latency constraints
  • clock: Clock interface for time operations (use RealClock in production)

Returns a new Batcher processor that groups items efficiently.

func (*Batcher[T]) Name

func (b *Batcher[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*Batcher[T]) Process

func (b *Batcher[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[[]T]

Process groups input items into batches according to the configured constraints. It returns a channel of Result[[]T] where successful results contain batches and error results contain individual item processing errors.

Batching behavior:

  • Errors pass through immediately without being batched
  • Successful items are collected into batches
  • Batches are emitted when MaxSize is reached OR MaxLatency expires
  • Final partial batch is emitted when input channel closes
  • Context cancellation stops processing immediately

Memory safety:

  • Bounded memory usage limited by MaxSize
  • Single timer instance - no timer leaks
  • Proper cleanup on context cancellation

type Buffer

type Buffer[T any] struct {
	// contains filtered or unexported fields
}

Buffer adds buffering capacity to a stream by creating an output channel with a buffer. This helps decouple producers and consumers, allowing producers to continue sending items even when consumers are temporarily slower.

Buffer is a pass-through processor that preserves all Result[T] items unchanged, whether they contain successful values or errors. It provides buffering between pipeline stages without any transformation logic.

func NewBuffer

func NewBuffer[T any](size int) *Buffer[T]

NewBuffer creates a processor with a simple buffered output channel. This provides basic decoupling between producers and consumers, allowing the producer to continue sending items even when the consumer is temporarily slower.

When to use:

  • Smoothing out temporary processing speed mismatches
  • Decoupling producer and consumer goroutines
  • Handling brief bursts of high throughput
  • Providing breathing room for downstream processors
  • Simple async boundaries in pipelines

Example:

// Add a buffer of 1000 items
buffer := streamz.NewBuffer[Message](1000)

// Producer can send up to 1000 items without blocking
buffered := buffer.Process(ctx, messages)
for msg := range buffered {
	// Slower processing won't block producer
	expensiveOperation(msg)
}

// Chain with other processors for burst handling
buffer := streamz.NewBuffer[Event](5000)
throttle := streamz.NewThrottle[Event](100) // 100/sec

buffered := buffer.Process(ctx, events)
limited := throttle.Process(ctx, buffered)

Parameters:

  • size: Buffer capacity (0 for unbuffered, >0 for buffered channel)

Returns a new Buffer processor with the specified capacity.

func (*Buffer[T]) Name

func (b *Buffer[T]) Name() string

Name returns the processor name for identification and debugging.

func (*Buffer[T]) Process

func (b *Buffer[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process creates a buffered channel and passes through all Result[T] items unchanged. Both successful values and errors are preserved without modification. The buffer provides decoupling between producer and consumer goroutines.

type Clock

type Clock = clockz.Clock

Clock provides time operations for deterministic testing.

var RealClock Clock = clockz.RealClock

RealClock is the default Clock using standard time.

type DeadLetterQueue

type DeadLetterQueue[T any] struct {
	// contains filtered or unexported fields
}

DeadLetterQueue separates successful results from failed results into two distinct channels. Unlike standard processors that return a single Result[T] channel, DLQ returns two channels: one for successes and one for failures. This enables different downstream processing strategies for successful vs failed items.

Non-Consumed Channel Handling: If either output channel is not consumed, DLQ will drop items that cannot be sent to prevent deadlocks. Dropped items are logged and counted for monitoring.

Concurrent Behavior: DeadLetterQueue is safe for concurrent use. Multiple goroutines can consume from both output channels simultaneously. The internal distribution logic runs in a single goroutine to prevent race conditions.

Usage Examples:

// Separate successes and failures for different handling
dlq := streamz.NewDeadLetterQueue[Order](streamz.RealClock)
successes, failures := dlq.Process(ctx, orders)

// Process successes in main path
go func() {
	for success := range successes {
		processOrder(success.Value())
	}
}()

// Handle failures separately (logging, metrics, retry queue)
go func() {
	for failure := range failures {
		log.Printf("Order processing failed: %v", failure.Error())
		retryQueue.Send(failure)
	}
}()

// Or ignore failures if only successes matter
successes, _ := dlq.Process(ctx, orders)
// failures channel ignored - items will be dropped and logged

func NewDeadLetterQueue

func NewDeadLetterQueue[T any](clock Clock) *DeadLetterQueue[T]

NewDeadLetterQueue creates a new DeadLetterQueue processor. Uses the provided clock for timeout operations - use RealClock for production, fake clock for deterministic testing.

func (*DeadLetterQueue[T]) DroppedCount

func (dlq *DeadLetterQueue[T]) DroppedCount() uint64

DroppedCount returns the number of items dropped due to non-consumed channels.

func (*DeadLetterQueue[T]) Name

func (dlq *DeadLetterQueue[T]) Name() string

Name returns the processor name.

func (*DeadLetterQueue[T]) Process

func (dlq *DeadLetterQueue[T]) Process(ctx context.Context, in <-chan Result[T]) (success <-chan Result[T], failure <-chan Result[T])

Process separates the input stream into success and failure channels. Returns two channels: (successes, failures).

The distribution logic runs in a single goroutine to prevent race conditions. Both output channels are closed when the input channel closes or context is canceled.

If either output channel cannot accept an item (blocked consumer), the item is dropped and logged to prevent deadlocks. This is particularly important when only one of the two channels is consumed.

func (*DeadLetterQueue[T]) WithName

func (dlq *DeadLetterQueue[T]) WithName(name string) *DeadLetterQueue[T]

WithName sets a custom name for the DeadLetterQueue (for logging and monitoring).

type Debounce

type Debounce[T any] struct {
	// contains filtered or unexported fields
}

Debounce emits items only after a quiet period with no new items. It's useful for filtering out rapid successive events. Errors are passed through immediately without debouncing.

func NewDebounce

func NewDebounce[T any](duration time.Duration, clock Clock) *Debounce[T]

NewDebounce creates a processor that delays and coalesces rapid events. Only the last successful item in a rapid sequence is emitted after the specified duration of inactivity. Errors are passed through immediately.

When to use:

  • User input handling (e.g., search-as-you-type)
  • Sensor readings that fluctuate rapidly
  • File system change notifications
  • Preventing excessive API calls from UI events

Example:

// Debounce search queries - only search after 300ms of no typing
debounce := streamz.NewDebounce[string](300 * time.Millisecond, streamz.RealClock)
debounced := debounce.Process(ctx, searchQueries)

// Debounce sensor readings
debounce := streamz.NewDebounce[SensorData](time.Second, streamz.RealClock)
stable := debounce.Process(ctx, readings)

Parameters:

  • duration: The quiet period before emitting an item
  • clock: Clock interface for time operations

func (*Debounce[T]) Name

func (d *Debounce[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*Debounce[T]) Process

func (d *Debounce[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process debounces the input stream, emitting only the last item after a period of quiet. Errors are passed through immediately without debouncing. The last successful item is emitted when the input channel closes.

type FanIn

type FanIn[T any] struct {
	// contains filtered or unexported fields
}

FanIn merges multiple Result[T] input channels into a single output channel. It implements the fan-in concurrency pattern, collecting Results from multiple sources and combining them into a single stream. This version uses the Result[T] pattern for unified error handling instead of dual-channel returns.

func NewFanIn

func NewFanIn[T any]() *FanIn[T]

NewFanIn creates a processor that merges multiple Result[T] channels into one. This implements the fan-in concurrency pattern, collecting Results from multiple sources and combining them into a single unified stream using the Result[T] pattern.

When to use:

  • Aggregating data from multiple sources with error handling
  • Collecting results from parallel workers that may fail
  • Merging event streams from different services
  • Consolidating logs or metrics with error propagation
  • Load balancing consumer implementation

Example:

// Merge Results from multiple sources
fanin := streamz.NewFanIn[Event]()

// Combine Result streams from different services
merged := fanin.Process(ctx,
	serviceA.EventResults(),
	serviceB.EventResults(),
	serviceC.EventResults())

// Process merged stream with unified error handling
for result := range merged {
	if result.IsError() {
		log.Printf("FanIn error: %v", result.Error())
		continue
	}
	processEvent(result.Value())
}

// Collect Results from parallel workers
fanin := streamz.NewFanIn[ProcessedData]()
workers := make([]<-chan Result[ProcessedData], numWorkers)
for i := range workers {
	workers[i] = startWorker(ctx, workQueue)
}
results := fanin.Process(ctx, workers...)

Returns a new FanIn processor for merging multiple Result streams.

func (*FanIn[T]) Process

func (*FanIn[T]) Process(ctx context.Context, ins ...<-chan Result[T]) <-chan Result[T]

Process merges multiple Result[T] channels into a single Result[T] channel. Both successful values and errors flow through the unified output channel. This eliminates the need for dual-channel error handling patterns.

type FanOut

type FanOut[T any] struct {
	// contains filtered or unexported fields
}

FanOut distributes Result[T] items from a single input channel to multiple output channels. It implements the fan-out concurrency pattern using the Result[T] pattern for unified error handling, duplicating each Result to all outputs, enabling parallel processing of both successful values and errors.

func NewFanOut

func NewFanOut[T any](count int) *FanOut[T]

NewFanOut creates a processor that distributes Result[T] items to multiple output channels. This implements the fan-out concurrency pattern with Result[T] support, duplicating each input Result to all output channels, enabling parallel processing of both successful values and errors.

When to use:

  • Parallel processing of the same data with error handling
  • Broadcasting events to multiple consumers that need error context
  • Implementing publish-subscribe patterns with unified error handling
  • Load distribution for CPU-intensive tasks with failure isolation
  • Creating processing pipelines with multiple branches and error propagation

Error behavior:

  • Errors are duplicated to all output channels (each gets an independent copy)
  • Each output channel receives exactly the same Result sequence
  • No error transformation occurs - errors flow through unchanged
  • Backpressure from slow consumers affects all outputs (blocking behavior)

Example:

// Distribute Result events to 3 parallel processors
fanout := streamz.NewFanOut[Event](3)
outputs := fanout.Process(ctx, eventResults)

// Each output gets a copy of every Result (success or error)
go processResultStream1(outputs[0]) // Real-time alerting
go processResultStream2(outputs[1]) // Analytics
go processResultStream3(outputs[2]) // Archival

// Fan out for parallel enrichment with error handling
fanout := streamz.NewFanOut[Record](runtime.NumCPU())
branches := fanout.Process(ctx, recordResults)

// Process each branch with different enrichers
enriched := make([]<-chan Result[EnrichedRecord], len(branches))
for i, branch := range branches {
	enriched[i] = enricher[i].Process(ctx, branch)
}
// Merge results back together with FanIn
merged := fanin.Process(ctx, enriched...)

Parameters:

  • count: Number of output channels to create

Returns a new FanOut processor that broadcasts Result[T] to multiple outputs.

func (*FanOut[T]) Process

func (f *FanOut[T]) Process(ctx context.Context, in <-chan Result[T]) []<-chan Result[T]

Process distributes Result[T] items from input to multiple output channels. Each Result (success or error) is duplicated to all output channels. The processor respects context cancellation and properly closes all output channels.

type Filter

type Filter[T any] struct {
	// contains filtered or unexported fields
}

Filter selectively passes items through a stream based on a predicate function. Only items for which the predicate returns true are emitted to the output channel. Items that don't match the predicate are discarded.

Filter is one of the most fundamental stream processing operations, commonly used for:

  • Data validation and quality control
  • Business rule application
  • Security filtering and content moderation
  • Performance optimization by reducing downstream load
  • A/B testing and conditional data routing

func NewFilter

func NewFilter[T any](predicate func(T) bool) *Filter[T]

NewFilter creates a processor that selectively passes items based on a predicate. Items for which the predicate returns true are forwarded unchanged. Items for which the predicate returns false are discarded.

The predicate function should be pure (no side effects) and deterministic for consistent and predictable filtering behavior.

When to use:

  • Remove invalid or unwanted data from streams
  • Apply business rules and validation logic
  • Filter based on data quality requirements
  • Implement conditional processing logic
  • Reduce processing load by filtering upstream

Example:

// Filter positive numbers
positive := streamz.NewFilter(func(n int) bool {
	return n > 0
})

// Filter non-empty strings
nonEmpty := streamz.NewFilter(func(s string) bool {
	return strings.TrimSpace(s) != ""
})

// Filter valid orders
validOrders := streamz.NewFilter(func(order Order) bool {
	return order.ID != "" && order.Amount > 0 && order.Status == "pending"
})

results := positive.Process(ctx, input)
for result := range results {
	if result.IsError() {
		log.Printf("Processing error: %v", result.Error())
	} else {
		fmt.Printf("Filtered result: %v\n", result.Value())
	}
}

Parameters:

  • predicate: Function that returns true for items to keep, false to discard

Returns a new Filter processor.

func (*Filter[T]) Name

func (f *Filter[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*Filter[T]) Process

func (f *Filter[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process filters input items based on the predicate function. Items that match the predicate (return true) are forwarded unchanged. Items that don't match the predicate are discarded. Errors are passed through unchanged without applying the predicate.

func (*Filter[T]) WithName

func (f *Filter[T]) WithName(name string) *Filter[T]

WithName sets a custom name for this processor. If not set, defaults to "filter". The name is used for debugging, monitoring, and error reporting.

type HashPartition

type HashPartition[T any, K comparable] struct {
	// contains filtered or unexported fields
}

HashPartition implements hash-based routing using a key extraction function and hash function. Keys are extracted from values and hashed to determine the target partition. Panic recovery ensures user function failures route to partition 0 (error partition).

func (*HashPartition[T, K]) Route

func (h *HashPartition[T, K]) Route(value T, partitionCount int) (idx int)

Route implements hash-based routing with panic recovery. Extracts key from value, hashes it, and uses improved distribution to avoid modulo bias.

type Mapper

type Mapper[In, Out any] struct {
	// contains filtered or unexported fields
}

Mapper transforms items from one type to another using a synchronous function. It processes items sequentially without goroutines, making it ideal for fast transformations that don't benefit from concurrency overhead.

For CPU-intensive or I/O-bound operations that can benefit from parallelization, use AsyncMapper instead.

func NewMapper

func NewMapper[In, Out any](fn func(context.Context, In) (Out, error)) *Mapper[In, Out]

NewMapper creates a processor that transforms items synchronously. Unlike AsyncMapper, this processes items one at a time in sequence, making it suitable for simple, fast transformations.

When to use:

  • Simple type conversions and data formatting
  • Fast computations that don't justify goroutine overhead
  • Transformations that must maintain strict sequential processing
  • Operations where concurrency would add complexity without benefit
  • Memory-sensitive scenarios where goroutine pools are costly

Example:

// Simple type conversion
toString := streamz.NewMapper(func(ctx context.Context, n int) (string, error) {
	return fmt.Sprintf("%d", n), nil
})

// Data formatting
formatUser := streamz.NewMapper(func(ctx context.Context, u User) (string, error) {
	return fmt.Sprintf("%s <%s>", u.Name, u.Email), nil
})

// Mathematical transformations
double := streamz.NewMapper(func(ctx context.Context, n int) (int, error) {
	return n * 2, nil
})

results := toString.Process(ctx, input)
for result := range results {
	if result.IsError() {
		log.Printf("Processing error: %v", result.Error())
	} else {
		fmt.Printf("Result: %s\n", result.Value())
	}
}

Parameters:

  • fn: Transformation function that converts In to Out

Returns a new Mapper processor.

func (*Mapper[In, Out]) Name

func (m *Mapper[In, Out]) Name() string

Name returns the processor name for debugging and monitoring.

func (*Mapper[In, Out]) Process

func (m *Mapper[In, Out]) Process(ctx context.Context, in <-chan Result[In]) <-chan Result[Out]

Process transforms input items synchronously using the provided function. Errors are passed through unchanged. Success values are transformed and wrapped in new Result instances.

func (*Mapper[In, Out]) WithName

func (m *Mapper[In, Out]) WithName(name string) *Mapper[In, Out]

WithName sets a custom name for this processor. If not set, defaults to "mapper".

type Partition

type Partition[T any] struct {
	// contains filtered or unexported fields
}

Partition splits a single input channel into N output channels using configurable routing strategies. The number of partitions is fixed at creation time and channels are created during Process method execution. Supports hash-based partitioning via key extraction and round-robin distribution via rotating counter. All errors route to partition 0 for centralized error handling.

func NewHashPartition

func NewHashPartition[T any, K comparable](
	partitionCount int,
	keyExtractor func(T) K,
	bufferSize int,
) (*Partition[T], error)

NewHashPartition creates a hash-based partition using the provided key extractor. Uses FNV-1a hash by default for good distribution properties and performance. The keyExtractor function must be pure (no side effects, no shared mutable state).

func NewPartition

func NewPartition[T any](config PartitionConfig[T]) (*Partition[T], error)

NewPartition creates a partition with custom strategy configuration. Validates all configuration parameters and returns an error for invalid inputs.

func NewRoundRobinPartition

func NewRoundRobinPartition[T any](partitionCount int, bufferSize int) (*Partition[T], error)

NewRoundRobinPartition creates a round-robin partition that distributes values evenly. Uses atomic operations for lock-free thread safety.

func (*Partition[T]) Process

func (p *Partition[T]) Process(ctx context.Context, in <-chan Result[T]) []<-chan Result[T]

Process splits input across N output channels using the configured strategy. Channels are created during this method call, not in the constructor. Returns a read-only slice of channels for immediate consumption. All channels are closed when processing completes or context is canceled.

type PartitionConfig

type PartitionConfig[T any] struct {
	Strategy       PartitionStrategy[T] // Routing strategy implementation
	PartitionCount int                  // Number of output partitions (must be > 0)
	BufferSize     int                  // Buffer size applied to all output channels (must be >= 0)
}

PartitionConfig configures partition behavior including strategy and buffer sizing.

type PartitionStrategy

type PartitionStrategy[T any] interface {
	Route(value T, partitionCount int) int // Returns partition index [0, N)
}

PartitionStrategy defines the routing behavior for distributing values across partitions. Implementations must be thread-safe as they may be called concurrently from multiple goroutines. The Route method must return a partition index in range [0, partitionCount).

type Result

type Result[T any] struct {
	// contains filtered or unexported fields
}

Result represents either a successful value or an error in stream processing. This is a proof of concept for unified error handling that eliminates dual-channel patterns. It follows the Result type pattern common in functional programming languages. Metadata support added to carry context through stream processing pipelines.

func AddWindowMetadata

func AddWindowMetadata[T any](result Result[T], meta WindowMetadata) Result[T]

AddWindowMetadata adds complete window metadata to a Result[T].

func NewError

func NewError[T any](item T, err error, processorName string) Result[T]

NewError creates a Result containing an error.

func NewSuccess

func NewSuccess[T any](value T) Result[T]

NewSuccess creates a Result containing a successful value.

func (Result[T]) Error

func (r Result[T]) Error() *StreamError[T]

Error returns the StreamError. Returns nil if this Result contains a successful value.

func (Result[T]) GetDurationMetadata

func (r Result[T]) GetDurationMetadata(key string) (time.Duration, bool, error)

GetDurationMetadata retrieves time.Duration metadata with enhanced type safety.

func (Result[T]) GetIntMetadata

func (r Result[T]) GetIntMetadata(key string) (value int, found bool, err error)

GetIntMetadata retrieves int metadata with enhanced type safety.

func (Result[T]) GetMetadata

func (r Result[T]) GetMetadata(key string) (interface{}, bool)

GetMetadata retrieves a metadata value by key. Returns the value and true if the key exists, nil and false otherwise. The caller must type-assert the returned value to the expected type.

func (Result[T]) GetStringMetadata

func (r Result[T]) GetStringMetadata(key string) (value string, found bool, err error)

GetStringMetadata retrieves string metadata with enhanced type safety. Returns: (value, found, error) - found=false, error=nil: key not present - found=false, error!=nil: key present but wrong type - found=true, error=nil: successful retrieval.

func (Result[T]) GetTimeMetadata

func (r Result[T]) GetTimeMetadata(key string) (time.Time, bool, error)

GetTimeMetadata retrieves time.Time metadata with enhanced type safety.

func (Result[T]) HasMetadata

func (r Result[T]) HasMetadata() bool

HasMetadata returns true if this Result contains any metadata.

func (Result[T]) IsError

func (r Result[T]) IsError() bool

IsError returns true if this Result contains an error.

func (Result[T]) IsSuccess

func (r Result[T]) IsSuccess() bool

IsSuccess returns true if this Result contains a successful value.

func (Result[T]) Map

func (r Result[T]) Map(fn func(T) T) Result[T]

Map applies a function to the value if this Result is successful. If this Result contains an error, returns the error unchanged. Metadata is preserved through successful transformations.

func (Result[T]) MapError

func (r Result[T]) MapError(fn func(*StreamError[T]) *StreamError[T]) Result[T]

MapError applies a function to transform the error if this Result contains an error. If this Result is successful, returns the success value unchanged. Metadata is preserved through error transformations.

func (Result[T]) MetadataKeys

func (r Result[T]) MetadataKeys() []string

MetadataKeys returns all metadata keys for this Result. Returns empty slice if no metadata present.

func (Result[T]) Value

func (r Result[T]) Value() T

Value returns the successful value. Panics if called on a Result containing an error - always check IsSuccess() first.

func (Result[T]) ValueOr

func (r Result[T]) ValueOr(fallback T) T

ValueOr returns the successful value if present, otherwise returns the fallback.

func (Result[T]) WithMetadata

func (r Result[T]) WithMetadata(key string, value interface{}) Result[T]

WithMetadata returns a new Result with the specified metadata key-value pair. This is a thread-safe immutable operation - the original Result is unchanged. Multiple calls can be chained to add multiple metadata entries. Returns error for empty keys to prevent silent failures.

type RoundRobinPartition

type RoundRobinPartition[T any] struct {
	// contains filtered or unexported fields
}

RoundRobinPartition implements counter-based routing that distributes values evenly across partitions. Uses an atomic counter to ensure thread-safe operation without locks.

func (*RoundRobinPartition[T]) Route

func (r *RoundRobinPartition[T]) Route(_ T, partitionCount int) int

Route implements round-robin routing using atomic counter. Thread-safe operation without locks for high performance.

type Sample

type Sample[T any] struct {
	// contains filtered or unexported fields
}

Sample randomly selects items from a stream based on a probability rate. It keeps successful items based on the configured rate (0.0 to 1.0) and always passes through errors unchanged.

Sample is used for:

  • Load shedding in high-volume streams
  • Creating statistical samples for monitoring
  • Random downsampling for performance optimization
  • A/B testing traffic distribution

The sampling decision is made independently for each item using cryptographically secure randomness. Items are either kept completely or dropped completely - no modification occurs.

func NewSample

func NewSample[T any](rate float64) *Sample[T]

NewSample creates a processor that randomly selects items based on probability. The rate parameter determines the probability (0.0 to 1.0) that each successful item will be kept in the stream. A rate of 0.0 drops all items, 1.0 keeps all items.

Error items are always passed through unchanged regardless of the rate.

When to use:

  • High-volume streams needing load reduction
  • Statistical sampling for monitoring/analytics
  • Performance optimization through data reduction
  • Random traffic splitting for testing
  • Memory pressure relief in processing pipelines

Example:

// Keep 10% of successful orders for monitoring
monitor := streamz.NewSample[Order](0.1)

// Half of metrics for storage optimization
storage := streamz.NewSample[Metric](0.5)

// Load testing with 1% of production traffic
loadTest := streamz.NewSample[Request](0.01).WithName("load-test-sample")

// A/B testing - 50/50 split
groupA := streamz.NewSample[User](0.5)

results := monitor.Process(ctx, input)
for result := range results {
	// Approximately 10% of successful items, all errors
	processMonitoringData(result)
}

Parameters:

  • rate: Probability (0.0-1.0) that successful items will be kept

Returns a new Sample processor. Panics if rate is outside the valid range [0.0, 1.0].

func (*Sample[T]) Name

func (s *Sample[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*Sample[T]) Process

func (s *Sample[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process randomly selects successful items based on the configured rate. Each successful item has an independent probability of being kept. Error items are always passed through unchanged.

The selection uses crypto/rand for secure randomness.

func (*Sample[T]) Rate

func (s *Sample[T]) Rate() float64

Rate returns the current sampling rate.

func (*Sample[T]) WithName

func (s *Sample[T]) WithName(name string) *Sample[T]

WithName sets a custom name for this processor. If not set, defaults to "sample". The name is used for debugging, monitoring, and error reporting.

type SessionWindow

type SessionWindow[T any] struct {
	// contains filtered or unexported fields
}

SessionWindow groups Results into dynamic windows based on activity gaps. A new session starts after a period of inactivity (gap), making it ideal for grouping related events that occur in bursts with quiet periods between them.

This version processes Result[T] streams, capturing both successful values and errors within each session for comprehensive session analysis and error correlation within activity periods.

Key characteristics:

  • Dynamic duration: Sessions vary based on activity patterns
  • Key-based: Multiple concurrent sessions via key extraction
  • Activity-driven: Extends with each new item, closes after gap

Performance characteristics:

  • Session closure latency: gap/8 average, gap/4 maximum (checked at gap/4 intervals)
  • Memory usage: O(active_sessions × items_per_session)
  • Processing overhead: Single map lookup per item
  • Goroutine usage: 1 goroutine per processor instance (no timer callbacks)
  • Session checking frequency: gap/4 (balanced latency vs CPU usage)

func NewSessionWindow

func NewSessionWindow[T any](keyFunc func(Result[T]) string, clock Clock) *SessionWindow[T]

NewSessionWindow creates a processor that groups Results into session-based windows. Sessions are defined by periods of activity separated by gaps of inactivity. The keyFunc extracts a session key from each Result, allowing multiple concurrent sessions. Use the fluent API to configure optional behavior like gap duration.

When to use:

  • User activity tracking with error monitoring (web sessions, app usage)
  • Grouping related log entries or transactions with failure analysis
  • Detecting work patterns with natural breaks and error clustering
  • Conversation threading in chat applications with error handling
  • Batch processing of related events with failure correlation
  • API request session analysis with success/failure rates

Example:

// Group user actions into sessions with error tracking (30-minute default gap)
sessions := streamz.NewSessionWindow(
	func(result Result[UserAction]) string {
		if result.IsError() {
			// Use error context to extract user ID for session grouping
			return result.Error().Item.UserID
		}
		return result.Value().UserID
	},
	streamz.RealClock,
)

// Custom gap duration for faster session timeout
sessions := streamz.NewSessionWindow(
	func(result Result[UserAction]) string {
		// Extract user ID from either success or error
		if result.IsSuccess() {
			return result.Value().UserID
		}
		return result.Error().Item.UserID
	},
	streamz.RealClock,
).WithGap(10*time.Minute)

results := sessions.Process(ctx, actionResults)
for result := range results {
	// Each result has session metadata attached
	if meta, err := streamz.GetWindowMetadata(result); err == nil {
		fmt.Printf("Action in session [%s]: %v from %s to %s\n",
			*meta.SessionKey,
			result.Value(),
			meta.Start.Format("15:04:05"),
			meta.End.Format("15:04:05"))
	}
}

// Collect into sessions for analysis when needed
collector := streamz.NewWindowCollector[UserAction]()
collections := collector.Process(ctx, results)
for collection := range collections {
	values := collection.Values()    // Successful actions
	errors := collection.Errors()    // Failed actions
	totalActions := collection.Count()
	successRate := float64(collection.SuccessCount()) / float64(totalActions) * 100

	// Alert on sessions with high error rates
	if successRate < 80 && totalActions > 5 {
		alert.Send("User session with high error rate", collection)
	}
}

Parameters:

  • keyFunc: Extracts session identifier from Results (handles both success and errors)
  • clock: Clock interface for time operations (use RealClock for production)

Returns a new SessionWindow processor with fluent configuration.

Performance notes:

  • Memory scales with number of concurrent sessions
  • Session closure latency: gap/8 average, gap/4 maximum
  • Best for activity-based grouping with natural boundaries

func (*SessionWindow[T]) Name

func (w *SessionWindow[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*SessionWindow[T]) Process

func (w *SessionWindow[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process groups Results into session-based windows, emitting individual Results with session metadata. Both successful values and errors extend sessions, allowing comprehensive analysis of user behavior, error patterns, and success rates within natural activity periods.

Session behavior: - New session starts with first Result for a key - Session extends with each new Result (success or error) for that key - Session closes after gap duration with no activity - Multiple concurrent sessions supported via key extraction - All Results within session timeframe get session metadata attached

Implementation uses single-goroutine architecture with periodic session checking at gap/4 intervals. Enhanced boundary tracking separates session state management from emitted metadata to handle dynamic session extension correctly:

  • Average latency: gap/8 (uniformly distributed)
  • Maximum latency: gap/4 (worst case)
  • Example: 30-minute gap = 3.75 minute average, 7.5 minute max closure delay

Performance and resource usage:

  • Memory scales with number of concurrent sessions
  • No memory growth within sessions (bounded by activity)
  • CPU usage: O(active_sessions) every gap/4 interval
  • Thread-safe: Single goroutine prevents timer callback races
  • Efficient cleanup: Expired sessions removed immediately

Trade-offs:

  • Delayed emission (gap/8 average) vs real-time complexity
  • Single goroutine safety vs multiple timer overhead
  • Memory for all active sessions vs streaming aggregation

func (*SessionWindow[T]) WithGap

func (w *SessionWindow[T]) WithGap(gap time.Duration) *SessionWindow[T]

WithGap sets the maximum time between Results in the same session. If not set, defaults to 30 minutes.

func (*SessionWindow[T]) WithName

func (w *SessionWindow[T]) WithName(name string) *SessionWindow[T]

WithName sets a custom name for this processor. If not set, defaults to "session-window".

type SlidingWindow

type SlidingWindow[T any] struct {
	// contains filtered or unexported fields
}

SlidingWindow groups Results into overlapping time-based windows. Unlike tumbling windows, sliding windows can overlap, allowing for smooth transitions and rolling calculations over time periods.

This version processes Result[T] streams, capturing both successful values and errors within overlapping time windows for comprehensive monitoring and analysis.

Key characteristics:

  • Overlapping: Items can belong to multiple windows
  • Configurable slide: Control overlap with slide interval
  • Smooth aggregations: Better for trend detection

Performance characteristics:

  • Window emission latency: At window.End time (size duration after window.Start)
  • Memory usage: O(active_windows × items_per_window) where active_windows = size/slide
  • Processing overhead: O(active_windows) map operations per item
  • Goroutine usage: 1 goroutine per processor instance
  • Overlap factor: size/slide determines number of concurrent windows

func NewSlidingWindow

func NewSlidingWindow[T any](size time.Duration, clock Clock) *SlidingWindow[T]

NewSlidingWindow creates a processor that groups Results into overlapping time windows. Each window has a fixed duration (size) and windows are created at regular intervals (slide). Use the fluent API to configure optional behavior like slide interval.

When to use:

  • Computing rolling averages with error rates over time
  • Smooth trend analysis with overlapping data points and failure tracking
  • Real-time dashboards with continuous updates and health monitoring
  • Detecting patterns that might span window boundaries
  • Gradual transitions in time-series analysis with error correlation

Example:

// Tumbling window behavior (no overlap) with Result[T]
window := streamz.NewSlidingWindow[Metric](5*time.Minute, streamz.RealClock)

// Overlapping windows: 5-minute window, new window every minute
window := streamz.NewSlidingWindow[Metric](5*time.Minute, streamz.RealClock).
	WithSlide(time.Minute)

results := window.Process(ctx, metricResults)
for result := range results {
	// Each result has window metadata attached
	if meta, err := streamz.GetWindowMetadata(result); err == nil {
		fmt.Printf("Metric in sliding window [%s-%s]: %v\n",
			meta.Start.Format("15:04"),
			meta.End.Format("15:04"),
			result.Value())
	}
}

// Collect into windows for analysis when needed
collector := streamz.NewWindowCollector[Metric]()
collections := collector.Process(ctx, results)
for collection := range collections {
	values := collection.Values()   // Successful metrics only
	errors := collection.Errors()   // Failed metrics only
	successRate := float64(collection.SuccessCount()) / float64(collection.Count()) * 100

	if successRate < 90 {
		alert.Send("Success rate below threshold", collection)
	}
}

Parameters:

  • size: Duration of each window (must be > 0)
  • clock: Clock interface for time operations (use RealClock for production)

Returns a new SlidingWindow processor with fluent configuration.

Performance notes:

  • Memory scales with overlap factor (size/slide)
  • CPU overhead proportional to number of active windows
  • Best for smooth aggregations and trend detection

func (*SlidingWindow[T]) Name

func (w *SlidingWindow[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*SlidingWindow[T]) Process

func (w *SlidingWindow[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process groups Results into overlapping time windows, emitting individual Results with window metadata. Results can belong to multiple windows if they overlap. Both successful values and errors are captured with their window context, enabling comprehensive analysis of patterns across overlapping time periods.

Window behavior: - Each Result gets window metadata attached (start, end, type, size, slide) - Results are assigned to all overlapping windows they belong to - Results are emitted when their windows expire (current time > window end) - All Results (success and errors) within the window timeframe are included

Performance and resource usage:

  • Memory usage scales with overlap: up to size/slide concurrent windows
  • Each item is duplicated in all overlapping windows (by reference)
  • Emission checked at slide intervals via ticker
  • Special optimization: When slide == size, uses efficient tumbling mode
  • Thread-safe: Single goroutine architecture prevents races

Trade-offs:

  • Higher memory usage than tumbling windows due to overlap
  • More CPU per item (checking multiple windows)
  • Smoother aggregations and trend detection
  • Better for detecting patterns spanning window boundaries

func (*SlidingWindow[T]) WithName

func (w *SlidingWindow[T]) WithName(name string) *SlidingWindow[T]

WithName sets a custom name for this processor. If not set, defaults to "sliding-window".

func (*SlidingWindow[T]) WithSlide

func (w *SlidingWindow[T]) WithSlide(slide time.Duration) *SlidingWindow[T]

WithSlide sets the slide interval for creating new windows. If not set, defaults to the window size (tumbling window behavior).

Parameters:

  • slide: Time interval between window starts. If smaller than size, windows overlap.

Returns the SlidingWindow for method chaining.

type StreamError

type StreamError[T any] struct {
	// Item is the original item that caused the processing error.
	Item T

	// Err is the underlying error that occurred during processing.
	Err error

	// ProcessorName identifies which processor generated the error.
	ProcessorName string

	// Timestamp records when the error occurred.
	Timestamp time.Time
}

StreamError represents an error that occurred during stream processing. It captures both the item that caused the error and the error itself, enabling better debugging and error handling strategies.

func NewStreamError

func NewStreamError[T any](item T, err error, processorName string) *StreamError[T]

NewStreamError creates a new StreamError with the current timestamp.

func (*StreamError[T]) Error

func (se *StreamError[T]) Error() string

Error implements the error interface.

func (*StreamError[T]) String

func (se *StreamError[T]) String() string

String returns a human-readable representation of the error.

func (*StreamError[T]) Unwrap

func (se *StreamError[T]) Unwrap() error

Unwrap returns the underlying error, enabling error wrapping chains.

type Switch

type Switch[T any, K comparable] struct {
	// contains filtered or unexported fields
}

Switch routes Result[T] to multiple output channels based on predicate evaluation. Errors bypass predicate evaluation and go directly to the error channel. Successful values are evaluated by the predicate to determine routing.

func NewSwitch

func NewSwitch[T any, K comparable](predicate func(T) K, config SwitchConfig[K]) *Switch[T, K]

NewSwitch creates a Switch with full configuration options.

func NewSwitchSimple

func NewSwitchSimple[T any, K comparable](predicate func(T) K) *Switch[T, K]

NewSwitchSimple creates a Switch with default configuration (unbuffered, no default route).

func (*Switch[T, K]) AddRoute

func (s *Switch[T, K]) AddRoute(key K) <-chan Result[T]

AddRoute explicitly creates a route for the given key.

func (*Switch[T, K]) ErrorChannel

func (s *Switch[T, K]) ErrorChannel() <-chan Result[T]

ErrorChannel returns read-only access to the error channel.

func (*Switch[T, K]) HasRoute

func (s *Switch[T, K]) HasRoute(key K) bool

HasRoute checks if a route exists for the given key.

func (*Switch[T, K]) Process

func (s *Switch[T, K]) Process(ctx context.Context, in <-chan Result[T]) (routes map[K]<-chan Result[T], errors <-chan Result[T])

Process routes input Results to output channels based on predicate evaluation. Returns read-only channel maps for routes and errors. All channels are closed when processing completes or context is canceled.

func (*Switch[T, K]) RemoveRoute

func (s *Switch[T, K]) RemoveRoute(key K) bool

RemoveRoute removes a route and closes its channel.

func (*Switch[T, K]) RouteKeys

func (s *Switch[T, K]) RouteKeys() []K

RouteKeys returns all current route keys.

type SwitchConfig

type SwitchConfig[K comparable] struct {
	DefaultKey *K  // Route for unknown predicate results (nil = drop)
	BufferSize int // Per-route channel buffer size (0 = unbuffered)
}

SwitchConfig configures Switch behavior.

type Tap

type Tap[T any] struct {
	// contains filtered or unexported fields
}

Tap executes a side effect function for each item while passing items through unchanged. It's used for logging, debugging, monitoring, metrics collection, and any other observational operations that shouldn't modify the data flow.

Tap is the simplest processor in streamz - it observes without interfering. The side effect function is called for every item (both success and error cases) but has no effect on what gets passed to the next stage of the pipeline.

func NewTap

func NewTap[T any](fn func(Result[T])) *Tap[T]

NewTap creates a processor that executes a side effect function on each Result[T] while passing all items through unchanged. The side effect function receives the complete Result[T], allowing it to handle both success and error cases.

When to use:

  • Debug logging and tracing
  • Metrics collection and monitoring
  • Audit trails and compliance logging
  • Performance monitoring and profiling
  • Testing and verification
  • Side effect operations that don't modify data

Example:

// Simple logging
logger := streamz.NewTap(func(result Result[Order]) {
	if result.IsError() {
		log.Printf("Error processing order: %v", result.Error())
	} else {
		log.Printf("Order processed: %+v", result.Value())
	}
})

// Metrics collection
var processedCount, errorCount atomic.Int64
metrics := streamz.NewTap(func(result Result[Order]) {
	if result.IsError() {
		errorCount.Add(1)
	} else {
		processedCount.Add(1)
	}
})

// Debug at specific pipeline stage
debug := streamz.NewTap(func(result Result[Order]) {
	if result.IsSuccess() {
		fmt.Printf("DEBUG: Order %s at validation stage\n",
			result.Value().ID)
	}
}).WithName("validation-debug")

results := logger.Process(ctx, input)
for result := range results {
	// Side effects executed, items unchanged
	fmt.Printf("Result: %+v\n", result)
}

Parameters:

  • fn: Side effect function that receives each Result[T]

Returns a new Tap processor.

func (*Tap[T]) Name

func (t *Tap[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*Tap[T]) Process

func (t *Tap[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process executes the side effect function on each item while passing all items through unchanged. Both successful values and errors are observed and forwarded. The side effect function is called with the complete Result[T], allowing it to distinguish between success and error cases.

func (*Tap[T]) WithName

func (t *Tap[T]) WithName(name string) *Tap[T]

WithName sets a custom name for this processor. If not set, defaults to "tap". The name is used for debugging, monitoring, and error reporting.

type Throttle

type Throttle[T any] struct {
	// contains filtered or unexported fields
}

Throttle limits the rate of items passing through the stream using leading edge behavior. It emits the first item immediately and then ignores subsequent items for a cooldown period. Errors are passed through immediately without throttling.

Concurrent Behavior: Multiple goroutines may call Process() on the same Throttle instance. The throttling state (lastEmit) is shared across all Process() calls.

func NewThrottle

func NewThrottle[T any](duration time.Duration, clock Clock) *Throttle[T]

NewThrottle creates a processor that implements leading edge throttling. The first item is emitted immediately, then subsequent items are ignored until the cooldown period expires. Errors are passed through immediately.

When to use:

  • Prevent overwhelming downstream services with rapid requests
  • Implement "first action wins" behavior for rapid user interactions
  • Rate limiting API calls with immediate first response
  • Controlling burst traffic patterns

Example:

// Throttle button clicks - only first click processed per 500ms
throttle := streamz.NewThrottle[ClickEvent](500 * time.Millisecond, streamz.RealClock)
processed := throttle.Process(ctx, clicks)

// Throttle API requests - first request immediate, others wait
throttle := streamz.NewThrottle[APIRequest](time.Second, streamz.RealClock)
limited := throttle.Process(ctx, requests)

Parameters:

  • duration: The cooldown period during which subsequent items are ignored. If duration is 0, all items pass through without throttling.
  • clock: Clock interface for time operations

func (*Throttle[T]) Name

func (th *Throttle[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*Throttle[T]) Process

func (th *Throttle[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process throttles the input stream using leading edge behavior. The first item is emitted immediately, then subsequent items are ignored until the cooldown period expires. Errors are passed through immediately. Uses timestamp comparison instead of timer goroutines for race-free operation.

type Ticker

type Ticker = clockz.Ticker

Ticker delivers ticks at intervals.

type Timer

type Timer = clockz.Timer

Timer represents a single event timer.

type TumblingWindow

type TumblingWindow[T any] struct {
	// contains filtered or unexported fields
}

TumblingWindow groups items into fixed-size, non-overlapping time windows. Each item gets window metadata attached and is emitted when its window expires, making it ideal for time-based aggregations with Result[T] metadata flow.

This version processes Result[T] streams, attaching window metadata to each individual Result for comprehensive monitoring and downstream processing.

Key characteristics:

  • Non-overlapping: Each item belongs to exactly one window
  • Fixed duration: All windows have the same size
  • Metadata-driven: Results carry window context via metadata
  • Predictable emission: Results emit at exact window boundaries

Performance characteristics:

  • Result emission latency: Exactly at window boundary (size duration)
  • Memory usage: O(items_per_window) - bounded by window size
  • Processing overhead: Metadata attachment per item
  • Goroutine usage: 1 goroutine per processor instance
  • No unbounded memory growth - results are emitted and cleared

func NewTumblingWindow

func NewTumblingWindow[T any](size time.Duration, clock Clock) *TumblingWindow[T]

NewTumblingWindow creates a processor that groups Results into fixed-size time windows. Unlike sliding windows, tumbling windows don't overlap - each Result belongs to exactly one window. Windows are emitted when their time period expires.

When to use:

  • Time-based aggregations with error tracking (hourly stats, daily summaries)
  • Periodic batch processing with failure monitoring
  • Rate calculations over fixed intervals
  • Log analysis and error reporting over time periods
  • Metrics collection with success/failure rates

Example:

// Process events with 1-minute window metadata
window := streamz.NewTumblingWindow[Event](time.Minute, streamz.RealClock)

results := window.Process(ctx, eventResults)
for result := range results {
	// Each result now has window metadata attached
	if meta, err := streamz.GetWindowMetadata(result); err == nil {
		fmt.Printf("Event in window [%s - %s]: %v\n",
			meta.Start.Format("15:04:05"),
			meta.End.Format("15:04:05"),
			result.Value())
	}
}

// Collect into traditional windows when needed
collector := streamz.NewWindowCollector[Event]()
collections := collector.Process(ctx, results)
for collection := range collections {
	values := collection.Values()   // Only successful events
	errors := collection.Errors()   // Only errors
	generateReport(values, errors)
}

Parameters:

  • size: Duration of each window (must be > 0)
  • clock: Clock interface for time operations (use RealClock for production)

Returns a new TumblingWindow processor for time-based grouping with Result[T] support.

Performance notes:

  • Optimal for non-overlapping aggregations
  • Minimal memory overhead (single active window)
  • Predictable latency: exactly window size duration

func (*TumblingWindow[T]) Name

func (w *TumblingWindow[T]) Name() string

Name returns the processor name for debugging and monitoring.

func (*TumblingWindow[T]) Process

func (w *TumblingWindow[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]

Process groups Results into fixed-size time windows, emitting individual Results with window metadata. Both successful values and errors are captured with their window context, enabling comprehensive error tracking and success rate monitoring over time periods.

Window behavior:

  • Each Result gets window metadata attached (start, end, type, size)
  • Results are emitted exactly at their window boundary expiration
  • Empty windows produce no output
  • On context cancellation or input close, partial windows emit their Results if non-empty

Performance and resource usage:

  • Zero allocation for window tracking (single active window)
  • Predictable memory: size = items_per_window × average_item_size
  • Latency: Items buffered for up to window size duration
  • Thread-safe: Single goroutine architecture prevents races

func (*TumblingWindow[T]) WithName

func (w *TumblingWindow[T]) WithName(name string) *TumblingWindow[T]

WithName sets a custom name for this processor.

type WindowCollection

type WindowCollection[T any] struct {
	Start   time.Time
	End     time.Time
	Meta    WindowMetadata
	Results []Result[T]
}

WindowCollection represents aggregated results from a single window.

func (WindowCollection[T]) Count

func (wc WindowCollection[T]) Count() int

Count returns the total number of results in the window collection.

func (WindowCollection[T]) ErrorCount

func (wc WindowCollection[T]) ErrorCount() int

ErrorCount returns the number of error results in the window collection.

func (WindowCollection[T]) Errors

func (wc WindowCollection[T]) Errors() []*StreamError[T]

Errors returns all errors from the window collection.

func (WindowCollection[T]) SuccessCount

func (wc WindowCollection[T]) SuccessCount() int

SuccessCount returns the number of successful results in the window collection.

func (WindowCollection[T]) Values

func (wc WindowCollection[T]) Values() []T

Values returns all successful values from the window collection.

type WindowCollector

type WindowCollector[T any] struct {
	// contains filtered or unexported fields
}

WindowCollector aggregates Results with matching window metadata.

func NewWindowCollector

func NewWindowCollector[T any]() *WindowCollector[T]

NewWindowCollector creates a new WindowCollector for aggregating Results by window.

func (*WindowCollector[T]) Process

func (c *WindowCollector[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan WindowCollection[T]

Process aggregates Results with matching window metadata into WindowCollections. Uses struct-based keys to eliminate string allocation overhead for high performance.

type WindowConfig

type WindowConfig struct {
	// Size is the duration of each window.
	Size time.Duration

	// Slide is the slide interval for sliding windows.
	// If 0 or equal to Size, windows don't overlap (tumbling windows).
	// If less than Size, windows overlap (sliding windows).
	Slide time.Duration

	// MaxCount is the maximum number of items per window.
	// If 0, there's no item count limit.
	MaxCount int
}

WindowConfig configures windowing behavior for window processors.

type WindowInfo

type WindowInfo struct {
	Size       time.Duration
	Slide      *time.Duration
	Gap        *time.Duration
	SessionKey *string
	Start      time.Time
	End        time.Time
	Type       WindowType
}

WindowInfo provides type-safe access to window metadata.

func GetWindowInfo

func GetWindowInfo[T any](result Result[T]) (WindowInfo, error)

GetWindowInfo extracts and validates window metadata with enhanced type safety.

type WindowMetadata

type WindowMetadata struct {
	Size       time.Duration
	Slide      *time.Duration
	Gap        *time.Duration
	SessionKey *string
	Start      time.Time
	End        time.Time
	Type       string
}

WindowMetadata encapsulates window-related metadata operations.

func GetWindowMetadata

func GetWindowMetadata[T any](result Result[T]) (WindowMetadata, error)

GetWindowMetadata extracts window metadata from a Result[T].

type WindowType

type WindowType string

WindowType represents the type of window.

const (
	WindowTypeTumbling WindowType = "tumbling"
	WindowTypeSliding  WindowType = "sliding"
	WindowTypeSession  WindowType = "session"
)

Window type constants.

Directories

Path Synopsis
Package testing provides test utilities for streamz.
Package testing provides test utilities for streamz.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL