Documentation
¶
Overview ¶
Package streamz provides type-safe, composable stream processing primitives that work with Go channels, enabling real-time data processing through batching, windowing, and other streaming operations.
The core abstraction uses the Result[T] pattern which unifies success and error cases into a single channel. This eliminates dual-channel complexity while providing explicit error handling and better monitoring capabilities.
Basic usage:
ctx := context.Background()
source := make(chan Result[int])
// Create a simple processing pipeline
fanin := streamz.NewFanIn[int]()
// Process returns a single Result[T] channel
results := fanin.Process(ctx, source)
// Handle both success and error cases from single channel
for result := range results {
if result.IsError() {
log.Printf("Processing error: %v", result.Error())
} else {
fmt.Printf("Got item: %v\n", result.Value())
}
}
The package provides various processors for common streaming patterns:
- Batching and unbatching
- Windowing (tumbling, sliding, session)
- Buffering with different strategies
- Filtering and mapping
- Fan-in and fan-out
- Rate limiting and flow control
- Deduplication
- Monitoring and observability
Package streamz provides streaming data processing capabilities, including time-related operations for deterministic testing.
Index ¶
- Constants
- func IsInWindow[T any](result Result[T], timestamp time.Time) (bool, error)
- func WindowDuration[T any](result Result[T]) (time.Duration, error)
- type AsyncMapper
- func (a *AsyncMapper[In, Out]) Name() string
- func (a *AsyncMapper[In, Out]) Process(ctx context.Context, in <-chan Result[In]) <-chan Result[Out]
- func (a *AsyncMapper[In, Out]) WithBufferSize(size int) *AsyncMapper[In, Out]
- func (a *AsyncMapper[In, Out]) WithName(name string) *AsyncMapper[In, Out]
- func (a *AsyncMapper[In, Out]) WithOrdered(ordered bool) *AsyncMapper[In, Out]
- func (a *AsyncMapper[In, Out]) WithWorkers(workers int) *AsyncMapper[In, Out]
- type BatchConfig
- type Batcher
- type Buffer
- type Clock
- type DeadLetterQueue
- func (dlq *DeadLetterQueue[T]) DroppedCount() uint64
- func (dlq *DeadLetterQueue[T]) Name() string
- func (dlq *DeadLetterQueue[T]) Process(ctx context.Context, in <-chan Result[T]) (success <-chan Result[T], failure <-chan Result[T])
- func (dlq *DeadLetterQueue[T]) WithName(name string) *DeadLetterQueue[T]
- type Debounce
- type FanIn
- type FanOut
- type Filter
- type HashPartition
- type Mapper
- type Partition
- type PartitionConfig
- type PartitionStrategy
- type Result
- func (r Result[T]) Error() *StreamError[T]
- func (r Result[T]) GetDurationMetadata(key string) (time.Duration, bool, error)
- func (r Result[T]) GetIntMetadata(key string) (value int, found bool, err error)
- func (r Result[T]) GetMetadata(key string) (interface{}, bool)
- func (r Result[T]) GetStringMetadata(key string) (value string, found bool, err error)
- func (r Result[T]) GetTimeMetadata(key string) (time.Time, bool, error)
- func (r Result[T]) HasMetadata() bool
- func (r Result[T]) IsError() bool
- func (r Result[T]) IsSuccess() bool
- func (r Result[T]) Map(fn func(T) T) Result[T]
- func (r Result[T]) MapError(fn func(*StreamError[T]) *StreamError[T]) Result[T]
- func (r Result[T]) MetadataKeys() []string
- func (r Result[T]) Value() T
- func (r Result[T]) ValueOr(fallback T) T
- func (r Result[T]) WithMetadata(key string, value interface{}) Result[T]
- type RoundRobinPartition
- type Sample
- type SessionWindow
- type SlidingWindow
- type StreamError
- type Switch
- func (s *Switch[T, K]) AddRoute(key K) <-chan Result[T]
- func (s *Switch[T, K]) ErrorChannel() <-chan Result[T]
- func (s *Switch[T, K]) HasRoute(key K) bool
- func (s *Switch[T, K]) Process(ctx context.Context, in <-chan Result[T]) (routes map[K]<-chan Result[T], errors <-chan Result[T])
- func (s *Switch[T, K]) RemoveRoute(key K) bool
- func (s *Switch[T, K]) RouteKeys() []K
- type SwitchConfig
- type Tap
- type Throttle
- type Ticker
- type Timer
- type TumblingWindow
- type WindowCollection
- type WindowCollector
- type WindowConfig
- type WindowInfo
- type WindowMetadata
- type WindowType
Examples ¶
Constants ¶
const ( MetadataPartitionIndex = "partition_index" // int - target partition [0, N) MetadataPartitionTotal = "partition_total" // int - total partition count N MetadataPartitionStrategy = "partition_strategy" // string - "hash", "round_robin", or "error" )
Standard partition metadata keys for tracing and debugging.
const ( MetadataWindowStart = "window_start" // time.Time - window start time MetadataWindowEnd = "window_end" // time.Time - window end time MetadataWindowType = "window_type" // string - "tumbling", "sliding", "session" MetadataWindowSize = "window_size" // time.Duration - window duration MetadataWindowSlide = "window_slide" // time.Duration - slide interval (sliding only) MetadataWindowGap = "window_gap" // time.Duration - activity gap (session only) MetadataSessionKey = "session_key" // string - session identifier (session only) MetadataSource = "source" // string - data source identifier MetadataTimestamp = "timestamp" // time.Time - processing timestamp MetadataProcessor = "processor" // string - processor that added metadata MetadataRetryCount = "retry_count" // int - number of retries attempted MetadataSessionID = "session_id" // string - session identifier )
Standard metadata keys for common use cases.
Variables ¶
This section is empty.
Functions ¶
func IsInWindow ¶
IsInWindow checks if a timestamp falls within the Result's window.
Types ¶
type AsyncMapper ¶
type AsyncMapper[In, Out any] struct { // contains filtered or unexported fields }
AsyncMapper processes items concurrently using multiple worker goroutines. It supports both ordered processing (preserving input sequence) and unordered processing (emitting results as they complete). This enables parallelization of CPU-intensive or I/O-bound operations while maintaining flexibility in ordering requirements.
Example (Ordered) ¶
Example demonstrates ordered concurrent processing.
ctx := context.Background()
// Simulate API enrichment with preserved order
type User struct {
ID int
Name string
}
type EnrichedUser struct {
ID int
Name string
Profile string
}
// Create ordered async mapper
enricher := NewAsyncMapper(func(_ context.Context, u User) (EnrichedUser, error) {
// Simulate API call - processing times vary but order is preserved
return EnrichedUser{
ID: u.ID,
Name: u.Name,
Profile: fmt.Sprintf("Profile for %s", u.Name),
}, nil
}).WithWorkers(3).WithOrdered(true)
// Create input stream
users := make(chan Result[User])
go func() {
defer close(users)
users <- NewSuccess(User{ID: 1, Name: "Alice"})
users <- NewSuccess(User{ID: 2, Name: "Bob"})
users <- NewSuccess(User{ID: 3, Name: "Carol"})
}()
// Process with preserved order
enriched := enricher.Process(ctx, users)
for result := range enriched {
if result.IsError() {
fmt.Printf("Error: %v\n", result.Error())
} else {
user := result.Value()
fmt.Printf("User %d: %s - %s\n", user.ID, user.Name, user.Profile)
}
}
Output: User 1: Alice - Profile for Alice User 2: Bob - Profile for Bob User 3: Carol - Profile for Carol
Example (Unordered) ¶
Example demonstrates unordered concurrent processing for maximum throughput.
ctx := context.Background()
// Create unordered async mapper for maximum throughput
processor := NewAsyncMapper(func(_ context.Context, i int) (int, error) {
// Simulate CPU-intensive work
return i * i, nil
}).WithWorkers(4).WithOrdered(false)
// Create input stream
numbers := make(chan Result[int])
go func() {
defer close(numbers)
for i := 1; i <= 5; i++ {
numbers <- NewSuccess(i)
}
}()
// Process without order preservation
squares := processor.Process(ctx, numbers)
results := make([]int, 0, 5)
for result := range squares {
if result.IsError() {
fmt.Printf("Error: %v\n", result.Error())
} else {
results = append(results, result.Value())
}
}
// Sort for consistent output (order may vary in real usage)
sort.Ints(results)
fmt.Printf("Squares: %v\n", results)
Output: Squares: [1 4 9 16 25]
func NewAsyncMapper ¶
func NewAsyncMapper[In, Out any](fn func(context.Context, In) (Out, error)) *AsyncMapper[In, Out]
NewAsyncMapper creates a processor that executes transformations concurrently. By default, it preserves input order and uses runtime.NumCPU() workers. Use the fluent API to configure behavior like worker count and ordering.
When to use:
- CPU-intensive transformations (image processing, encryption)
- I/O-bound operations (API calls, database queries)
- Parallel enrichment while optionally maintaining sequence
- Speeding up independent transformations
- Rate-limited API calls with concurrent workers
Example:
// Parallel API enrichment with preserved order
enricher := streamz.NewAsyncMapper(func(ctx context.Context, id string) (User, error) {
// Each API call happens in parallel
return fetchUserFromAPI(ctx, id)
})
// Custom worker count for rate-limited APIs
enricher := streamz.NewAsyncMapper(func(ctx context.Context, id string) (User, error) {
return fetchUserFromAPI(ctx, id)
}).WithWorkers(10)
// Unordered processing for maximum throughput
processor := streamz.NewAsyncMapper(func(ctx context.Context, img Image) (Thumbnail, error) {
return generateThumbnail(ctx, img)
}).WithOrdered(false).WithWorkers(runtime.NumCPU())
results := processor.Process(ctx, input)
for result := range results {
if result.IsError() {
log.Printf("Processing error: %v", result.Error())
} else {
fmt.Printf("Result: %+v\n", result.Value())
}
}
Parameters:
- fn: Transformation function that can be safely executed concurrently
Returns a new AsyncMapper processor with fluent configuration.
func (*AsyncMapper[In, Out]) Name ¶
func (a *AsyncMapper[In, Out]) Name() string
Name returns the processor name for debugging and monitoring.
func (*AsyncMapper[In, Out]) Process ¶
func (a *AsyncMapper[In, Out]) Process(ctx context.Context, in <-chan Result[In]) <-chan Result[Out]
Process transforms input items concurrently across multiple workers. In ordered mode, output maintains input sequence despite variable processing times. In unordered mode, results are emitted as they complete for maximum throughput. Errors are wrapped in StreamError with original item context.
func (*AsyncMapper[In, Out]) WithBufferSize ¶
func (a *AsyncMapper[In, Out]) WithBufferSize(size int) *AsyncMapper[In, Out]
WithBufferSize sets the reorder buffer size for ordered processing. This controls memory usage when processing times vary significantly. Only affects ordered mode. Defaults to 100.
func (*AsyncMapper[In, Out]) WithName ¶
func (a *AsyncMapper[In, Out]) WithName(name string) *AsyncMapper[In, Out]
WithName sets a custom name for this processor. If not set, defaults to "async-mapper".
func (*AsyncMapper[In, Out]) WithOrdered ¶
func (a *AsyncMapper[In, Out]) WithOrdered(ordered bool) *AsyncMapper[In, Out]
WithOrdered controls whether output preserves input order. If ordered=true (default), output items maintain their input sequence despite variable processing times. If ordered=false, results are emitted as they complete.
func (*AsyncMapper[In, Out]) WithWorkers ¶
func (a *AsyncMapper[In, Out]) WithWorkers(workers int) *AsyncMapper[In, Out]
WithWorkers sets the number of concurrent workers. If not set, defaults to runtime.NumCPU().
type BatchConfig ¶
type BatchConfig struct {
// MaxLatency is the maximum time to wait before emitting a partial batch.
// If set, a batch will be emitted after this duration even if it's not full.
MaxLatency time.Duration
// MaxSize is the maximum number of items in a batch.
// A batch is emitted immediately when it reaches this size.
MaxSize int
}
BatchConfig configures batching behavior for the Batcher processor.
type Batcher ¶
type Batcher[T any] struct { // contains filtered or unexported fields }
Batcher collects items from a stream and groups them into batches based on size or time constraints. It emits a batch when either the maximum size is reached or the maximum latency expires, whichever comes first. This is useful for optimizing downstream operations that work more efficiently with groups of items rather than individual items.
Batcher handles errors by separating them from successful batches. Errors are passed through immediately without being included in batches. This ensures error visibility while maintaining batch integrity.
func NewBatcher ¶
func NewBatcher[T any](config BatchConfig, clock Clock) *Batcher[T]
NewBatcher creates a processor that intelligently groups items into batches. Batches are emitted when either the size limit is reached OR the time limit expires, whichever comes first. This dual-trigger approach balances throughput with latency.
Key behaviors:
- Errors are passed through immediately without affecting batches
- Successful items are batched according to MaxSize and MaxLatency constraints
- Timeouts are treated as normal batch emissions (not errors)
- Memory usage is bounded by MaxSize configuration
- Single-goroutine pattern prevents race conditions
When to use:
- Optimizing database writes with bulk operations
- Reducing API calls by batching requests
- Implementing micro-batching for stream processing
- Buffering events for periodic processing
- Cost optimization through batch operations
Example:
// Batch up to 1000 items or 5 seconds, whichever comes first
batcher := streamz.NewBatcher[Event](streamz.BatchConfig{
MaxSize: 1000,
MaxLatency: 5 * time.Second,
}, streamz.RealClock)
batches := batcher.Process(ctx, events)
for result := range batches {
if result.IsError() {
log.Printf("Individual item error: %v", result.Error())
continue
}
batch := result.Value()
// Process batch of up to 1000 items
// Never waits more than 5 seconds
bulkInsert(batch)
}
// Optimize API calls with smart batching
apiBatcher := streamz.NewBatcher[Request](streamz.BatchConfig{
MaxSize: 100, // API limit
MaxLatency: 100 * time.Millisecond, // Max acceptable delay
}, streamz.RealClock)
Parameters:
- config: Batch configuration with size and latency constraints
- clock: Clock interface for time operations (use RealClock in production)
Returns a new Batcher processor that groups items efficiently.
func (*Batcher[T]) Process ¶
Process groups input items into batches according to the configured constraints. It returns a channel of Result[[]T] where successful results contain batches and error results contain individual item processing errors.
Batching behavior:
- Errors pass through immediately without being batched
- Successful items are collected into batches
- Batches are emitted when MaxSize is reached OR MaxLatency expires
- Final partial batch is emitted when input channel closes
- Context cancellation stops processing immediately
Memory safety:
- Bounded memory usage limited by MaxSize
- Single timer instance - no timer leaks
- Proper cleanup on context cancellation
type Buffer ¶
type Buffer[T any] struct { // contains filtered or unexported fields }
Buffer adds buffering capacity to a stream by creating an output channel with a buffer. This helps decouple producers and consumers, allowing producers to continue sending items even when consumers are temporarily slower.
Buffer is a pass-through processor that preserves all Result[T] items unchanged, whether they contain successful values or errors. It provides buffering between pipeline stages without any transformation logic.
func NewBuffer ¶
NewBuffer creates a processor with a simple buffered output channel. This provides basic decoupling between producers and consumers, allowing the producer to continue sending items even when the consumer is temporarily slower.
When to use:
- Smoothing out temporary processing speed mismatches
- Decoupling producer and consumer goroutines
- Handling brief bursts of high throughput
- Providing breathing room for downstream processors
- Simple async boundaries in pipelines
Example:
// Add a buffer of 1000 items
buffer := streamz.NewBuffer[Message](1000)
// Producer can send up to 1000 items without blocking
buffered := buffer.Process(ctx, messages)
for msg := range buffered {
// Slower processing won't block producer
expensiveOperation(msg)
}
// Chain with other processors for burst handling
buffer := streamz.NewBuffer[Event](5000)
throttle := streamz.NewThrottle[Event](100) // 100/sec
buffered := buffer.Process(ctx, events)
limited := throttle.Process(ctx, buffered)
Parameters:
- size: Buffer capacity (0 for unbuffered, >0 for buffered channel)
Returns a new Buffer processor with the specified capacity.
type DeadLetterQueue ¶
type DeadLetterQueue[T any] struct { // contains filtered or unexported fields }
DeadLetterQueue separates successful results from failed results into two distinct channels. Unlike standard processors that return a single Result[T] channel, DLQ returns two channels: one for successes and one for failures. This enables different downstream processing strategies for successful vs failed items.
Non-Consumed Channel Handling: If either output channel is not consumed, DLQ will drop items that cannot be sent to prevent deadlocks. Dropped items are logged and counted for monitoring.
Concurrent Behavior: DeadLetterQueue is safe for concurrent use. Multiple goroutines can consume from both output channels simultaneously. The internal distribution logic runs in a single goroutine to prevent race conditions.
Usage Examples:
// Separate successes and failures for different handling
dlq := streamz.NewDeadLetterQueue[Order](streamz.RealClock)
successes, failures := dlq.Process(ctx, orders)
// Process successes in main path
go func() {
for success := range successes {
processOrder(success.Value())
}
}()
// Handle failures separately (logging, metrics, retry queue)
go func() {
for failure := range failures {
log.Printf("Order processing failed: %v", failure.Error())
retryQueue.Send(failure)
}
}()
// Or ignore failures if only successes matter
successes, _ := dlq.Process(ctx, orders)
// failures channel ignored - items will be dropped and logged
func NewDeadLetterQueue ¶
func NewDeadLetterQueue[T any](clock Clock) *DeadLetterQueue[T]
NewDeadLetterQueue creates a new DeadLetterQueue processor. Uses the provided clock for timeout operations - use RealClock for production, fake clock for deterministic testing.
func (*DeadLetterQueue[T]) DroppedCount ¶
func (dlq *DeadLetterQueue[T]) DroppedCount() uint64
DroppedCount returns the number of items dropped due to non-consumed channels.
func (*DeadLetterQueue[T]) Name ¶
func (dlq *DeadLetterQueue[T]) Name() string
Name returns the processor name.
func (*DeadLetterQueue[T]) Process ¶
func (dlq *DeadLetterQueue[T]) Process(ctx context.Context, in <-chan Result[T]) (success <-chan Result[T], failure <-chan Result[T])
Process separates the input stream into success and failure channels. Returns two channels: (successes, failures).
The distribution logic runs in a single goroutine to prevent race conditions. Both output channels are closed when the input channel closes or context is canceled.
If either output channel cannot accept an item (blocked consumer), the item is dropped and logged to prevent deadlocks. This is particularly important when only one of the two channels is consumed.
func (*DeadLetterQueue[T]) WithName ¶
func (dlq *DeadLetterQueue[T]) WithName(name string) *DeadLetterQueue[T]
WithName sets a custom name for the DeadLetterQueue (for logging and monitoring).
type Debounce ¶
type Debounce[T any] struct { // contains filtered or unexported fields }
Debounce emits items only after a quiet period with no new items. It's useful for filtering out rapid successive events. Errors are passed through immediately without debouncing.
func NewDebounce ¶
NewDebounce creates a processor that delays and coalesces rapid events. Only the last successful item in a rapid sequence is emitted after the specified duration of inactivity. Errors are passed through immediately.
When to use:
- User input handling (e.g., search-as-you-type)
- Sensor readings that fluctuate rapidly
- File system change notifications
- Preventing excessive API calls from UI events
Example:
// Debounce search queries - only search after 300ms of no typing debounce := streamz.NewDebounce[string](300 * time.Millisecond, streamz.RealClock) debounced := debounce.Process(ctx, searchQueries) // Debounce sensor readings debounce := streamz.NewDebounce[SensorData](time.Second, streamz.RealClock) stable := debounce.Process(ctx, readings)
Parameters:
- duration: The quiet period before emitting an item
- clock: Clock interface for time operations
type FanIn ¶
type FanIn[T any] struct { // contains filtered or unexported fields }
FanIn merges multiple Result[T] input channels into a single output channel. It implements the fan-in concurrency pattern, collecting Results from multiple sources and combining them into a single stream. This version uses the Result[T] pattern for unified error handling instead of dual-channel returns.
func NewFanIn ¶
NewFanIn creates a processor that merges multiple Result[T] channels into one. This implements the fan-in concurrency pattern, collecting Results from multiple sources and combining them into a single unified stream using the Result[T] pattern.
When to use:
- Aggregating data from multiple sources with error handling
- Collecting results from parallel workers that may fail
- Merging event streams from different services
- Consolidating logs or metrics with error propagation
- Load balancing consumer implementation
Example:
// Merge Results from multiple sources
fanin := streamz.NewFanIn[Event]()
// Combine Result streams from different services
merged := fanin.Process(ctx,
serviceA.EventResults(),
serviceB.EventResults(),
serviceC.EventResults())
// Process merged stream with unified error handling
for result := range merged {
if result.IsError() {
log.Printf("FanIn error: %v", result.Error())
continue
}
processEvent(result.Value())
}
// Collect Results from parallel workers
fanin := streamz.NewFanIn[ProcessedData]()
workers := make([]<-chan Result[ProcessedData], numWorkers)
for i := range workers {
workers[i] = startWorker(ctx, workQueue)
}
results := fanin.Process(ctx, workers...)
Returns a new FanIn processor for merging multiple Result streams.
type FanOut ¶
type FanOut[T any] struct { // contains filtered or unexported fields }
FanOut distributes Result[T] items from a single input channel to multiple output channels. It implements the fan-out concurrency pattern using the Result[T] pattern for unified error handling, duplicating each Result to all outputs, enabling parallel processing of both successful values and errors.
func NewFanOut ¶
NewFanOut creates a processor that distributes Result[T] items to multiple output channels. This implements the fan-out concurrency pattern with Result[T] support, duplicating each input Result to all output channels, enabling parallel processing of both successful values and errors.
When to use:
- Parallel processing of the same data with error handling
- Broadcasting events to multiple consumers that need error context
- Implementing publish-subscribe patterns with unified error handling
- Load distribution for CPU-intensive tasks with failure isolation
- Creating processing pipelines with multiple branches and error propagation
Error behavior:
- Errors are duplicated to all output channels (each gets an independent copy)
- Each output channel receives exactly the same Result sequence
- No error transformation occurs - errors flow through unchanged
- Backpressure from slow consumers affects all outputs (blocking behavior)
Example:
// Distribute Result events to 3 parallel processors
fanout := streamz.NewFanOut[Event](3)
outputs := fanout.Process(ctx, eventResults)
// Each output gets a copy of every Result (success or error)
go processResultStream1(outputs[0]) // Real-time alerting
go processResultStream2(outputs[1]) // Analytics
go processResultStream3(outputs[2]) // Archival
// Fan out for parallel enrichment with error handling
fanout := streamz.NewFanOut[Record](runtime.NumCPU())
branches := fanout.Process(ctx, recordResults)
// Process each branch with different enrichers
enriched := make([]<-chan Result[EnrichedRecord], len(branches))
for i, branch := range branches {
enriched[i] = enricher[i].Process(ctx, branch)
}
// Merge results back together with FanIn
merged := fanin.Process(ctx, enriched...)
Parameters:
- count: Number of output channels to create
Returns a new FanOut processor that broadcasts Result[T] to multiple outputs.
type Filter ¶
type Filter[T any] struct { // contains filtered or unexported fields }
Filter selectively passes items through a stream based on a predicate function. Only items for which the predicate returns true are emitted to the output channel. Items that don't match the predicate are discarded.
Filter is one of the most fundamental stream processing operations, commonly used for:
- Data validation and quality control
- Business rule application
- Security filtering and content moderation
- Performance optimization by reducing downstream load
- A/B testing and conditional data routing
func NewFilter ¶
NewFilter creates a processor that selectively passes items based on a predicate. Items for which the predicate returns true are forwarded unchanged. Items for which the predicate returns false are discarded.
The predicate function should be pure (no side effects) and deterministic for consistent and predictable filtering behavior.
When to use:
- Remove invalid or unwanted data from streams
- Apply business rules and validation logic
- Filter based on data quality requirements
- Implement conditional processing logic
- Reduce processing load by filtering upstream
Example:
// Filter positive numbers
positive := streamz.NewFilter(func(n int) bool {
return n > 0
})
// Filter non-empty strings
nonEmpty := streamz.NewFilter(func(s string) bool {
return strings.TrimSpace(s) != ""
})
// Filter valid orders
validOrders := streamz.NewFilter(func(order Order) bool {
return order.ID != "" && order.Amount > 0 && order.Status == "pending"
})
results := positive.Process(ctx, input)
for result := range results {
if result.IsError() {
log.Printf("Processing error: %v", result.Error())
} else {
fmt.Printf("Filtered result: %v\n", result.Value())
}
}
Parameters:
- predicate: Function that returns true for items to keep, false to discard
Returns a new Filter processor.
type HashPartition ¶
type HashPartition[T any, K comparable] struct { // contains filtered or unexported fields }
HashPartition implements hash-based routing using a key extraction function and hash function. Keys are extracted from values and hashed to determine the target partition. Panic recovery ensures user function failures route to partition 0 (error partition).
func (*HashPartition[T, K]) Route ¶
func (h *HashPartition[T, K]) Route(value T, partitionCount int) (idx int)
Route implements hash-based routing with panic recovery. Extracts key from value, hashes it, and uses improved distribution to avoid modulo bias.
type Mapper ¶
type Mapper[In, Out any] struct { // contains filtered or unexported fields }
Mapper transforms items from one type to another using a synchronous function. It processes items sequentially without goroutines, making it ideal for fast transformations that don't benefit from concurrency overhead.
For CPU-intensive or I/O-bound operations that can benefit from parallelization, use AsyncMapper instead.
func NewMapper ¶
NewMapper creates a processor that transforms items synchronously. Unlike AsyncMapper, this processes items one at a time in sequence, making it suitable for simple, fast transformations.
When to use:
- Simple type conversions and data formatting
- Fast computations that don't justify goroutine overhead
- Transformations that must maintain strict sequential processing
- Operations where concurrency would add complexity without benefit
- Memory-sensitive scenarios where goroutine pools are costly
Example:
// Simple type conversion
toString := streamz.NewMapper(func(ctx context.Context, n int) (string, error) {
return fmt.Sprintf("%d", n), nil
})
// Data formatting
formatUser := streamz.NewMapper(func(ctx context.Context, u User) (string, error) {
return fmt.Sprintf("%s <%s>", u.Name, u.Email), nil
})
// Mathematical transformations
double := streamz.NewMapper(func(ctx context.Context, n int) (int, error) {
return n * 2, nil
})
results := toString.Process(ctx, input)
for result := range results {
if result.IsError() {
log.Printf("Processing error: %v", result.Error())
} else {
fmt.Printf("Result: %s\n", result.Value())
}
}
Parameters:
- fn: Transformation function that converts In to Out
Returns a new Mapper processor.
type Partition ¶
type Partition[T any] struct { // contains filtered or unexported fields }
Partition splits a single input channel into N output channels using configurable routing strategies. The number of partitions is fixed at creation time and channels are created during Process method execution. Supports hash-based partitioning via key extraction and round-robin distribution via rotating counter. All errors route to partition 0 for centralized error handling.
func NewHashPartition ¶
func NewHashPartition[T any, K comparable]( partitionCount int, keyExtractor func(T) K, bufferSize int, ) (*Partition[T], error)
NewHashPartition creates a hash-based partition using the provided key extractor. Uses FNV-1a hash by default for good distribution properties and performance. The keyExtractor function must be pure (no side effects, no shared mutable state).
func NewPartition ¶
func NewPartition[T any](config PartitionConfig[T]) (*Partition[T], error)
NewPartition creates a partition with custom strategy configuration. Validates all configuration parameters and returns an error for invalid inputs.
func NewRoundRobinPartition ¶
NewRoundRobinPartition creates a round-robin partition that distributes values evenly. Uses atomic operations for lock-free thread safety.
func (*Partition[T]) Process ¶
Process splits input across N output channels using the configured strategy. Channels are created during this method call, not in the constructor. Returns a read-only slice of channels for immediate consumption. All channels are closed when processing completes or context is canceled.
type PartitionConfig ¶
type PartitionConfig[T any] struct { Strategy PartitionStrategy[T] // Routing strategy implementation PartitionCount int // Number of output partitions (must be > 0) BufferSize int // Buffer size applied to all output channels (must be >= 0) }
PartitionConfig configures partition behavior including strategy and buffer sizing.
type PartitionStrategy ¶
type PartitionStrategy[T any] interface { Route(value T, partitionCount int) int // Returns partition index [0, N) }
PartitionStrategy defines the routing behavior for distributing values across partitions. Implementations must be thread-safe as they may be called concurrently from multiple goroutines. The Route method must return a partition index in range [0, partitionCount).
type Result ¶
type Result[T any] struct { // contains filtered or unexported fields }
Result represents either a successful value or an error in stream processing. This is a proof of concept for unified error handling that eliminates dual-channel patterns. It follows the Result type pattern common in functional programming languages. Metadata support added to carry context through stream processing pipelines.
func AddWindowMetadata ¶
func AddWindowMetadata[T any](result Result[T], meta WindowMetadata) Result[T]
AddWindowMetadata adds complete window metadata to a Result[T].
func NewSuccess ¶
NewSuccess creates a Result containing a successful value.
func (Result[T]) Error ¶
func (r Result[T]) Error() *StreamError[T]
Error returns the StreamError. Returns nil if this Result contains a successful value.
func (Result[T]) GetDurationMetadata ¶
GetDurationMetadata retrieves time.Duration metadata with enhanced type safety.
func (Result[T]) GetIntMetadata ¶
GetIntMetadata retrieves int metadata with enhanced type safety.
func (Result[T]) GetMetadata ¶
GetMetadata retrieves a metadata value by key. Returns the value and true if the key exists, nil and false otherwise. The caller must type-assert the returned value to the expected type.
func (Result[T]) GetStringMetadata ¶
GetStringMetadata retrieves string metadata with enhanced type safety. Returns: (value, found, error) - found=false, error=nil: key not present - found=false, error!=nil: key present but wrong type - found=true, error=nil: successful retrieval.
func (Result[T]) GetTimeMetadata ¶
GetTimeMetadata retrieves time.Time metadata with enhanced type safety.
func (Result[T]) HasMetadata ¶
HasMetadata returns true if this Result contains any metadata.
func (Result[T]) Map ¶
Map applies a function to the value if this Result is successful. If this Result contains an error, returns the error unchanged. Metadata is preserved through successful transformations.
func (Result[T]) MapError ¶
func (r Result[T]) MapError(fn func(*StreamError[T]) *StreamError[T]) Result[T]
MapError applies a function to transform the error if this Result contains an error. If this Result is successful, returns the success value unchanged. Metadata is preserved through error transformations.
func (Result[T]) MetadataKeys ¶
MetadataKeys returns all metadata keys for this Result. Returns empty slice if no metadata present.
func (Result[T]) Value ¶
func (r Result[T]) Value() T
Value returns the successful value. Panics if called on a Result containing an error - always check IsSuccess() first.
func (Result[T]) ValueOr ¶
func (r Result[T]) ValueOr(fallback T) T
ValueOr returns the successful value if present, otherwise returns the fallback.
func (Result[T]) WithMetadata ¶
WithMetadata returns a new Result with the specified metadata key-value pair. This is a thread-safe immutable operation - the original Result is unchanged. Multiple calls can be chained to add multiple metadata entries. Returns error for empty keys to prevent silent failures.
type RoundRobinPartition ¶
type RoundRobinPartition[T any] struct { // contains filtered or unexported fields }
RoundRobinPartition implements counter-based routing that distributes values evenly across partitions. Uses an atomic counter to ensure thread-safe operation without locks.
func (*RoundRobinPartition[T]) Route ¶
func (r *RoundRobinPartition[T]) Route(_ T, partitionCount int) int
Route implements round-robin routing using atomic counter. Thread-safe operation without locks for high performance.
type Sample ¶
type Sample[T any] struct { // contains filtered or unexported fields }
Sample randomly selects items from a stream based on a probability rate. It keeps successful items based on the configured rate (0.0 to 1.0) and always passes through errors unchanged.
Sample is used for:
- Load shedding in high-volume streams
- Creating statistical samples for monitoring
- Random downsampling for performance optimization
- A/B testing traffic distribution
The sampling decision is made independently for each item using cryptographically secure randomness. Items are either kept completely or dropped completely - no modification occurs.
func NewSample ¶
NewSample creates a processor that randomly selects items based on probability. The rate parameter determines the probability (0.0 to 1.0) that each successful item will be kept in the stream. A rate of 0.0 drops all items, 1.0 keeps all items.
Error items are always passed through unchanged regardless of the rate.
When to use:
- High-volume streams needing load reduction
- Statistical sampling for monitoring/analytics
- Performance optimization through data reduction
- Random traffic splitting for testing
- Memory pressure relief in processing pipelines
Example:
// Keep 10% of successful orders for monitoring
monitor := streamz.NewSample[Order](0.1)
// Half of metrics for storage optimization
storage := streamz.NewSample[Metric](0.5)
// Load testing with 1% of production traffic
loadTest := streamz.NewSample[Request](0.01).WithName("load-test-sample")
// A/B testing - 50/50 split
groupA := streamz.NewSample[User](0.5)
results := monitor.Process(ctx, input)
for result := range results {
// Approximately 10% of successful items, all errors
processMonitoringData(result)
}
Parameters:
- rate: Probability (0.0-1.0) that successful items will be kept
Returns a new Sample processor. Panics if rate is outside the valid range [0.0, 1.0].
type SessionWindow ¶
type SessionWindow[T any] struct { // contains filtered or unexported fields }
SessionWindow groups Results into dynamic windows based on activity gaps. A new session starts after a period of inactivity (gap), making it ideal for grouping related events that occur in bursts with quiet periods between them.
This version processes Result[T] streams, capturing both successful values and errors within each session for comprehensive session analysis and error correlation within activity periods.
Key characteristics:
- Dynamic duration: Sessions vary based on activity patterns
- Key-based: Multiple concurrent sessions via key extraction
- Activity-driven: Extends with each new item, closes after gap
Performance characteristics:
- Session closure latency: gap/8 average, gap/4 maximum (checked at gap/4 intervals)
- Memory usage: O(active_sessions × items_per_session)
- Processing overhead: Single map lookup per item
- Goroutine usage: 1 goroutine per processor instance (no timer callbacks)
- Session checking frequency: gap/4 (balanced latency vs CPU usage)
func NewSessionWindow ¶
func NewSessionWindow[T any](keyFunc func(Result[T]) string, clock Clock) *SessionWindow[T]
NewSessionWindow creates a processor that groups Results into session-based windows. Sessions are defined by periods of activity separated by gaps of inactivity. The keyFunc extracts a session key from each Result, allowing multiple concurrent sessions. Use the fluent API to configure optional behavior like gap duration.
When to use:
- User activity tracking with error monitoring (web sessions, app usage)
- Grouping related log entries or transactions with failure analysis
- Detecting work patterns with natural breaks and error clustering
- Conversation threading in chat applications with error handling
- Batch processing of related events with failure correlation
- API request session analysis with success/failure rates
Example:
// Group user actions into sessions with error tracking (30-minute default gap)
sessions := streamz.NewSessionWindow(
func(result Result[UserAction]) string {
if result.IsError() {
// Use error context to extract user ID for session grouping
return result.Error().Item.UserID
}
return result.Value().UserID
},
streamz.RealClock,
)
// Custom gap duration for faster session timeout
sessions := streamz.NewSessionWindow(
func(result Result[UserAction]) string {
// Extract user ID from either success or error
if result.IsSuccess() {
return result.Value().UserID
}
return result.Error().Item.UserID
},
streamz.RealClock,
).WithGap(10*time.Minute)
results := sessions.Process(ctx, actionResults)
for result := range results {
// Each result has session metadata attached
if meta, err := streamz.GetWindowMetadata(result); err == nil {
fmt.Printf("Action in session [%s]: %v from %s to %s\n",
*meta.SessionKey,
result.Value(),
meta.Start.Format("15:04:05"),
meta.End.Format("15:04:05"))
}
}
// Collect into sessions for analysis when needed
collector := streamz.NewWindowCollector[UserAction]()
collections := collector.Process(ctx, results)
for collection := range collections {
values := collection.Values() // Successful actions
errors := collection.Errors() // Failed actions
totalActions := collection.Count()
successRate := float64(collection.SuccessCount()) / float64(totalActions) * 100
// Alert on sessions with high error rates
if successRate < 80 && totalActions > 5 {
alert.Send("User session with high error rate", collection)
}
}
Parameters:
- keyFunc: Extracts session identifier from Results (handles both success and errors)
- clock: Clock interface for time operations (use RealClock for production)
Returns a new SessionWindow processor with fluent configuration.
Performance notes:
- Memory scales with number of concurrent sessions
- Session closure latency: gap/8 average, gap/4 maximum
- Best for activity-based grouping with natural boundaries
func (*SessionWindow[T]) Name ¶
func (w *SessionWindow[T]) Name() string
Name returns the processor name for debugging and monitoring.
func (*SessionWindow[T]) Process ¶
func (w *SessionWindow[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]
Process groups Results into session-based windows, emitting individual Results with session metadata. Both successful values and errors extend sessions, allowing comprehensive analysis of user behavior, error patterns, and success rates within natural activity periods.
Session behavior: - New session starts with first Result for a key - Session extends with each new Result (success or error) for that key - Session closes after gap duration with no activity - Multiple concurrent sessions supported via key extraction - All Results within session timeframe get session metadata attached
Implementation uses single-goroutine architecture with periodic session checking at gap/4 intervals. Enhanced boundary tracking separates session state management from emitted metadata to handle dynamic session extension correctly:
- Average latency: gap/8 (uniformly distributed)
- Maximum latency: gap/4 (worst case)
- Example: 30-minute gap = 3.75 minute average, 7.5 minute max closure delay
Performance and resource usage:
- Memory scales with number of concurrent sessions
- No memory growth within sessions (bounded by activity)
- CPU usage: O(active_sessions) every gap/4 interval
- Thread-safe: Single goroutine prevents timer callback races
- Efficient cleanup: Expired sessions removed immediately
Trade-offs:
- Delayed emission (gap/8 average) vs real-time complexity
- Single goroutine safety vs multiple timer overhead
- Memory for all active sessions vs streaming aggregation
func (*SessionWindow[T]) WithGap ¶
func (w *SessionWindow[T]) WithGap(gap time.Duration) *SessionWindow[T]
WithGap sets the maximum time between Results in the same session. If not set, defaults to 30 minutes.
func (*SessionWindow[T]) WithName ¶
func (w *SessionWindow[T]) WithName(name string) *SessionWindow[T]
WithName sets a custom name for this processor. If not set, defaults to "session-window".
type SlidingWindow ¶
type SlidingWindow[T any] struct { // contains filtered or unexported fields }
SlidingWindow groups Results into overlapping time-based windows. Unlike tumbling windows, sliding windows can overlap, allowing for smooth transitions and rolling calculations over time periods.
This version processes Result[T] streams, capturing both successful values and errors within overlapping time windows for comprehensive monitoring and analysis.
Key characteristics:
- Overlapping: Items can belong to multiple windows
- Configurable slide: Control overlap with slide interval
- Smooth aggregations: Better for trend detection
Performance characteristics:
- Window emission latency: At window.End time (size duration after window.Start)
- Memory usage: O(active_windows × items_per_window) where active_windows = size/slide
- Processing overhead: O(active_windows) map operations per item
- Goroutine usage: 1 goroutine per processor instance
- Overlap factor: size/slide determines number of concurrent windows
func NewSlidingWindow ¶
func NewSlidingWindow[T any](size time.Duration, clock Clock) *SlidingWindow[T]
NewSlidingWindow creates a processor that groups Results into overlapping time windows. Each window has a fixed duration (size) and windows are created at regular intervals (slide). Use the fluent API to configure optional behavior like slide interval.
When to use:
- Computing rolling averages with error rates over time
- Smooth trend analysis with overlapping data points and failure tracking
- Real-time dashboards with continuous updates and health monitoring
- Detecting patterns that might span window boundaries
- Gradual transitions in time-series analysis with error correlation
Example:
// Tumbling window behavior (no overlap) with Result[T]
window := streamz.NewSlidingWindow[Metric](5*time.Minute, streamz.RealClock)
// Overlapping windows: 5-minute window, new window every minute
window := streamz.NewSlidingWindow[Metric](5*time.Minute, streamz.RealClock).
WithSlide(time.Minute)
results := window.Process(ctx, metricResults)
for result := range results {
// Each result has window metadata attached
if meta, err := streamz.GetWindowMetadata(result); err == nil {
fmt.Printf("Metric in sliding window [%s-%s]: %v\n",
meta.Start.Format("15:04"),
meta.End.Format("15:04"),
result.Value())
}
}
// Collect into windows for analysis when needed
collector := streamz.NewWindowCollector[Metric]()
collections := collector.Process(ctx, results)
for collection := range collections {
values := collection.Values() // Successful metrics only
errors := collection.Errors() // Failed metrics only
successRate := float64(collection.SuccessCount()) / float64(collection.Count()) * 100
if successRate < 90 {
alert.Send("Success rate below threshold", collection)
}
}
Parameters:
- size: Duration of each window (must be > 0)
- clock: Clock interface for time operations (use RealClock for production)
Returns a new SlidingWindow processor with fluent configuration.
Performance notes:
- Memory scales with overlap factor (size/slide)
- CPU overhead proportional to number of active windows
- Best for smooth aggregations and trend detection
func (*SlidingWindow[T]) Name ¶
func (w *SlidingWindow[T]) Name() string
Name returns the processor name for debugging and monitoring.
func (*SlidingWindow[T]) Process ¶
func (w *SlidingWindow[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]
Process groups Results into overlapping time windows, emitting individual Results with window metadata. Results can belong to multiple windows if they overlap. Both successful values and errors are captured with their window context, enabling comprehensive analysis of patterns across overlapping time periods.
Window behavior: - Each Result gets window metadata attached (start, end, type, size, slide) - Results are assigned to all overlapping windows they belong to - Results are emitted when their windows expire (current time > window end) - All Results (success and errors) within the window timeframe are included
Performance and resource usage:
- Memory usage scales with overlap: up to size/slide concurrent windows
- Each item is duplicated in all overlapping windows (by reference)
- Emission checked at slide intervals via ticker
- Special optimization: When slide == size, uses efficient tumbling mode
- Thread-safe: Single goroutine architecture prevents races
Trade-offs:
- Higher memory usage than tumbling windows due to overlap
- More CPU per item (checking multiple windows)
- Smoother aggregations and trend detection
- Better for detecting patterns spanning window boundaries
func (*SlidingWindow[T]) WithName ¶
func (w *SlidingWindow[T]) WithName(name string) *SlidingWindow[T]
WithName sets a custom name for this processor. If not set, defaults to "sliding-window".
func (*SlidingWindow[T]) WithSlide ¶
func (w *SlidingWindow[T]) WithSlide(slide time.Duration) *SlidingWindow[T]
WithSlide sets the slide interval for creating new windows. If not set, defaults to the window size (tumbling window behavior).
Parameters:
- slide: Time interval between window starts. If smaller than size, windows overlap.
Returns the SlidingWindow for method chaining.
type StreamError ¶
type StreamError[T any] struct { // Item is the original item that caused the processing error. Item T // Err is the underlying error that occurred during processing. Err error // ProcessorName identifies which processor generated the error. ProcessorName string // Timestamp records when the error occurred. Timestamp time.Time }
StreamError represents an error that occurred during stream processing. It captures both the item that caused the error and the error itself, enabling better debugging and error handling strategies.
func NewStreamError ¶
func NewStreamError[T any](item T, err error, processorName string) *StreamError[T]
NewStreamError creates a new StreamError with the current timestamp.
func (*StreamError[T]) Error ¶
func (se *StreamError[T]) Error() string
Error implements the error interface.
func (*StreamError[T]) String ¶
func (se *StreamError[T]) String() string
String returns a human-readable representation of the error.
func (*StreamError[T]) Unwrap ¶
func (se *StreamError[T]) Unwrap() error
Unwrap returns the underlying error, enabling error wrapping chains.
type Switch ¶
type Switch[T any, K comparable] struct { // contains filtered or unexported fields }
Switch routes Result[T] to multiple output channels based on predicate evaluation. Errors bypass predicate evaluation and go directly to the error channel. Successful values are evaluated by the predicate to determine routing.
func NewSwitch ¶
func NewSwitch[T any, K comparable](predicate func(T) K, config SwitchConfig[K]) *Switch[T, K]
NewSwitch creates a Switch with full configuration options.
func NewSwitchSimple ¶
func NewSwitchSimple[T any, K comparable](predicate func(T) K) *Switch[T, K]
NewSwitchSimple creates a Switch with default configuration (unbuffered, no default route).
func (*Switch[T, K]) ErrorChannel ¶
ErrorChannel returns read-only access to the error channel.
func (*Switch[T, K]) Process ¶
func (s *Switch[T, K]) Process(ctx context.Context, in <-chan Result[T]) (routes map[K]<-chan Result[T], errors <-chan Result[T])
Process routes input Results to output channels based on predicate evaluation. Returns read-only channel maps for routes and errors. All channels are closed when processing completes or context is canceled.
func (*Switch[T, K]) RemoveRoute ¶
RemoveRoute removes a route and closes its channel.
type SwitchConfig ¶
type SwitchConfig[K comparable] struct { DefaultKey *K // Route for unknown predicate results (nil = drop) BufferSize int // Per-route channel buffer size (0 = unbuffered) }
SwitchConfig configures Switch behavior.
type Tap ¶
type Tap[T any] struct { // contains filtered or unexported fields }
Tap executes a side effect function for each item while passing items through unchanged. It's used for logging, debugging, monitoring, metrics collection, and any other observational operations that shouldn't modify the data flow.
Tap is the simplest processor in streamz - it observes without interfering. The side effect function is called for every item (both success and error cases) but has no effect on what gets passed to the next stage of the pipeline.
func NewTap ¶
NewTap creates a processor that executes a side effect function on each Result[T] while passing all items through unchanged. The side effect function receives the complete Result[T], allowing it to handle both success and error cases.
When to use:
- Debug logging and tracing
- Metrics collection and monitoring
- Audit trails and compliance logging
- Performance monitoring and profiling
- Testing and verification
- Side effect operations that don't modify data
Example:
// Simple logging
logger := streamz.NewTap(func(result Result[Order]) {
if result.IsError() {
log.Printf("Error processing order: %v", result.Error())
} else {
log.Printf("Order processed: %+v", result.Value())
}
})
// Metrics collection
var processedCount, errorCount atomic.Int64
metrics := streamz.NewTap(func(result Result[Order]) {
if result.IsError() {
errorCount.Add(1)
} else {
processedCount.Add(1)
}
})
// Debug at specific pipeline stage
debug := streamz.NewTap(func(result Result[Order]) {
if result.IsSuccess() {
fmt.Printf("DEBUG: Order %s at validation stage\n",
result.Value().ID)
}
}).WithName("validation-debug")
results := logger.Process(ctx, input)
for result := range results {
// Side effects executed, items unchanged
fmt.Printf("Result: %+v\n", result)
}
Parameters:
- fn: Side effect function that receives each Result[T]
Returns a new Tap processor.
func (*Tap[T]) Process ¶
Process executes the side effect function on each item while passing all items through unchanged. Both successful values and errors are observed and forwarded. The side effect function is called with the complete Result[T], allowing it to distinguish between success and error cases.
type Throttle ¶
type Throttle[T any] struct { // contains filtered or unexported fields }
Throttle limits the rate of items passing through the stream using leading edge behavior. It emits the first item immediately and then ignores subsequent items for a cooldown period. Errors are passed through immediately without throttling.
Concurrent Behavior: Multiple goroutines may call Process() on the same Throttle instance. The throttling state (lastEmit) is shared across all Process() calls.
func NewThrottle ¶
NewThrottle creates a processor that implements leading edge throttling. The first item is emitted immediately, then subsequent items are ignored until the cooldown period expires. Errors are passed through immediately.
When to use:
- Prevent overwhelming downstream services with rapid requests
- Implement "first action wins" behavior for rapid user interactions
- Rate limiting API calls with immediate first response
- Controlling burst traffic patterns
Example:
// Throttle button clicks - only first click processed per 500ms throttle := streamz.NewThrottle[ClickEvent](500 * time.Millisecond, streamz.RealClock) processed := throttle.Process(ctx, clicks) // Throttle API requests - first request immediate, others wait throttle := streamz.NewThrottle[APIRequest](time.Second, streamz.RealClock) limited := throttle.Process(ctx, requests)
Parameters:
- duration: The cooldown period during which subsequent items are ignored. If duration is 0, all items pass through without throttling.
- clock: Clock interface for time operations
func (*Throttle[T]) Process ¶
Process throttles the input stream using leading edge behavior. The first item is emitted immediately, then subsequent items are ignored until the cooldown period expires. Errors are passed through immediately. Uses timestamp comparison instead of timer goroutines for race-free operation.
type TumblingWindow ¶
type TumblingWindow[T any] struct { // contains filtered or unexported fields }
TumblingWindow groups items into fixed-size, non-overlapping time windows. Each item gets window metadata attached and is emitted when its window expires, making it ideal for time-based aggregations with Result[T] metadata flow.
This version processes Result[T] streams, attaching window metadata to each individual Result for comprehensive monitoring and downstream processing.
Key characteristics:
- Non-overlapping: Each item belongs to exactly one window
- Fixed duration: All windows have the same size
- Metadata-driven: Results carry window context via metadata
- Predictable emission: Results emit at exact window boundaries
Performance characteristics:
- Result emission latency: Exactly at window boundary (size duration)
- Memory usage: O(items_per_window) - bounded by window size
- Processing overhead: Metadata attachment per item
- Goroutine usage: 1 goroutine per processor instance
- No unbounded memory growth - results are emitted and cleared
func NewTumblingWindow ¶
func NewTumblingWindow[T any](size time.Duration, clock Clock) *TumblingWindow[T]
NewTumblingWindow creates a processor that groups Results into fixed-size time windows. Unlike sliding windows, tumbling windows don't overlap - each Result belongs to exactly one window. Windows are emitted when their time period expires.
When to use:
- Time-based aggregations with error tracking (hourly stats, daily summaries)
- Periodic batch processing with failure monitoring
- Rate calculations over fixed intervals
- Log analysis and error reporting over time periods
- Metrics collection with success/failure rates
Example:
// Process events with 1-minute window metadata
window := streamz.NewTumblingWindow[Event](time.Minute, streamz.RealClock)
results := window.Process(ctx, eventResults)
for result := range results {
// Each result now has window metadata attached
if meta, err := streamz.GetWindowMetadata(result); err == nil {
fmt.Printf("Event in window [%s - %s]: %v\n",
meta.Start.Format("15:04:05"),
meta.End.Format("15:04:05"),
result.Value())
}
}
// Collect into traditional windows when needed
collector := streamz.NewWindowCollector[Event]()
collections := collector.Process(ctx, results)
for collection := range collections {
values := collection.Values() // Only successful events
errors := collection.Errors() // Only errors
generateReport(values, errors)
}
Parameters:
- size: Duration of each window (must be > 0)
- clock: Clock interface for time operations (use RealClock for production)
Returns a new TumblingWindow processor for time-based grouping with Result[T] support.
Performance notes:
- Optimal for non-overlapping aggregations
- Minimal memory overhead (single active window)
- Predictable latency: exactly window size duration
func (*TumblingWindow[T]) Name ¶
func (w *TumblingWindow[T]) Name() string
Name returns the processor name for debugging and monitoring.
func (*TumblingWindow[T]) Process ¶
func (w *TumblingWindow[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan Result[T]
Process groups Results into fixed-size time windows, emitting individual Results with window metadata. Both successful values and errors are captured with their window context, enabling comprehensive error tracking and success rate monitoring over time periods.
Window behavior:
- Each Result gets window metadata attached (start, end, type, size)
- Results are emitted exactly at their window boundary expiration
- Empty windows produce no output
- On context cancellation or input close, partial windows emit their Results if non-empty
Performance and resource usage:
- Zero allocation for window tracking (single active window)
- Predictable memory: size = items_per_window × average_item_size
- Latency: Items buffered for up to window size duration
- Thread-safe: Single goroutine architecture prevents races
func (*TumblingWindow[T]) WithName ¶
func (w *TumblingWindow[T]) WithName(name string) *TumblingWindow[T]
WithName sets a custom name for this processor.
type WindowCollection ¶
type WindowCollection[T any] struct { Start time.Time End time.Time Meta WindowMetadata Results []Result[T] }
WindowCollection represents aggregated results from a single window.
func (WindowCollection[T]) Count ¶
func (wc WindowCollection[T]) Count() int
Count returns the total number of results in the window collection.
func (WindowCollection[T]) ErrorCount ¶
func (wc WindowCollection[T]) ErrorCount() int
ErrorCount returns the number of error results in the window collection.
func (WindowCollection[T]) Errors ¶
func (wc WindowCollection[T]) Errors() []*StreamError[T]
Errors returns all errors from the window collection.
func (WindowCollection[T]) SuccessCount ¶
func (wc WindowCollection[T]) SuccessCount() int
SuccessCount returns the number of successful results in the window collection.
func (WindowCollection[T]) Values ¶
func (wc WindowCollection[T]) Values() []T
Values returns all successful values from the window collection.
type WindowCollector ¶
type WindowCollector[T any] struct { // contains filtered or unexported fields }
WindowCollector aggregates Results with matching window metadata.
func NewWindowCollector ¶
func NewWindowCollector[T any]() *WindowCollector[T]
NewWindowCollector creates a new WindowCollector for aggregating Results by window.
func (*WindowCollector[T]) Process ¶
func (c *WindowCollector[T]) Process(ctx context.Context, in <-chan Result[T]) <-chan WindowCollection[T]
Process aggregates Results with matching window metadata into WindowCollections. Uses struct-based keys to eliminate string allocation overhead for high performance.
type WindowConfig ¶
type WindowConfig struct {
// Size is the duration of each window.
Size time.Duration
// Slide is the slide interval for sliding windows.
// If 0 or equal to Size, windows don't overlap (tumbling windows).
// If less than Size, windows overlap (sliding windows).
Slide time.Duration
// MaxCount is the maximum number of items per window.
// If 0, there's no item count limit.
MaxCount int
}
WindowConfig configures windowing behavior for window processors.
type WindowInfo ¶
type WindowInfo struct {
Size time.Duration
Slide *time.Duration
Gap *time.Duration
SessionKey *string
Start time.Time
End time.Time
Type WindowType
}
WindowInfo provides type-safe access to window metadata.
func GetWindowInfo ¶
func GetWindowInfo[T any](result Result[T]) (WindowInfo, error)
GetWindowInfo extracts and validates window metadata with enhanced type safety.
type WindowMetadata ¶
type WindowMetadata struct {
Size time.Duration
Slide *time.Duration
Gap *time.Duration
SessionKey *string
Start time.Time
End time.Time
Type string
}
WindowMetadata encapsulates window-related metadata operations.
func GetWindowMetadata ¶
func GetWindowMetadata[T any](result Result[T]) (WindowMetadata, error)
GetWindowMetadata extracts window metadata from a Result[T].
type WindowType ¶
type WindowType string
WindowType represents the type of window.
const ( WindowTypeTumbling WindowType = "tumbling" WindowTypeSliding WindowType = "sliding" WindowTypeSession WindowType = "session" )
Window type constants.