stream

package
v0.0.0-...-4ae7bce Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var EOS = errors.New("end of stream")

EOS signals end of stream

Functions

func AggregateMultiple

func AggregateMultiple[T any, A1, A2, R1, R2 any](
	stream Stream[T],
	agg1 Aggregator[T, A1, R1],
	agg2 Aggregator[T, A2, R2],
) (R1, R2, error)

AggregateMultiple runs multiple aggregators in parallel using Tee

func AggregateTriple

func AggregateTriple[T any, A1, A2, A3, R1, R2, R3 any](
	stream Stream[T],
	agg1 Aggregator[T, A1, R1],
	agg2 Aggregator[T, A2, R2],
	agg3 Aggregator[T, A3, R3],
) (R1, R2, R3, error)

AggregateTriple runs three aggregators in parallel

func AggregateWith

func AggregateWith[T, A, R any](stream Stream[T], agg Aggregator[T, A, R]) (R, error)

AggregateWith runs a single custom aggregator on a stream

func Avg

func Avg[T Numeric](stream Stream[T]) (float64, error)

Avg calculates average of numeric values

func Collect

func Collect[T any](stream Stream[T]) ([]T, error)

Collect gathers all stream elements into a slice

func Count

func Count[T any](stream Stream[T]) (int64, error)

Count counts elements

func EventTimeSessionWindow

func EventTimeSessionWindow(
	sessionTimeout time.Duration,
	options ...EventTimeWindowOption,
) func(Stream[Record]) Stream[Stream[Record]]

EventTimeSessionWindow creates session windows based on event time for Records

func EventTimeSlidingWindow

func EventTimeSlidingWindow(
	windowSize time.Duration,
	slideInterval time.Duration,
	options ...EventTimeWindowOption,
) func(Stream[Record]) Stream[Stream[Record]]

EventTimeSlidingWindow creates sliding windows based on event time for Records

func EventTimeTumblingWindow

func EventTimeTumblingWindow(
	windowSize time.Duration,
	options ...EventTimeWindowOption,
) func(Stream[Record]) Stream[Stream[Record]]

EventTimeTumblingWindow creates tumbling windows based on event time for Records

func ForEach

func ForEach[T any](fn func(T)) func(Stream[T]) error

ForEach executes a function for each element

func Get

func Get[T any](r Record, field string) (T, bool)

Get retrieves a typed value from a record with automatic conversion

func GetOr

func GetOr[T any](r Record, field string, defaultVal T) T

GetOr retrieves a typed value with a default fallback

func IsStreamType

func IsStreamType(value any) bool

IsStreamType checks if a value is a Stream[T] type

func Max

func Max[T Comparable](stream Stream[T]) (T, error)

Max finds maximum value

func Min

func Min[T Comparable](stream Stream[T]) (T, error)

Min finds minimum value

func ParseStandardTime

func ParseStandardTime(val any) (time.Time, bool)

ParseStandardTime parses various time formats and returns standardized UTC time

func RunAggregator

func RunAggregator[T, A, R any](stream Stream[T], agg Aggregator[T, A, R]) (R, error)

RunAggregator runs a single aggregator on a stream and returns the result

func StandardizeTime

func StandardizeTime(t time.Time) time.Time

StandardizeTime converts any time value to UTC and rounds to nearest second for consistent time comparisons across different timezones and precisions

func StandardizeTimeNano

func StandardizeTimeNano(t time.Time) time.Time

StandardizeTimeNano converts time to UTC and rounds to nearest nanosecond for high-precision time operations while ensuring timezone consistency

func StreamToCSV

func StreamToCSV(stream Stream[Record], writer io.Writer) error

StreamToCSV writes a Record stream to a CSV writer

func StreamToCSVFile

func StreamToCSVFile(stream Stream[Record], filename string) error

func StreamToJSON

func StreamToJSON(stream Stream[Record], writer io.Writer) error

StreamToJSON writes a Record stream to a JSON writer (defaults to JSON Lines)

func StreamToJSONFile

func StreamToJSONFile(stream Stream[Record], filename string) error

func StreamToProtobuf

func StreamToProtobuf(stream Stream[Record], writer io.Writer, messageDesc protoreflect.MessageDescriptor) error

StreamToProtobuf writes a Record stream to a protobuf writer

func StreamToProtobufFile

func StreamToProtobufFile(stream Stream[Record], filename string, messageDesc protoreflect.MessageDescriptor) error

func StreamToTSV

func StreamToTSV(stream Stream[Record], writer io.Writer) error

StreamToTSV writes a Record stream to a TSV writer

func StreamToTSVFile

func StreamToTSVFile(stream Stream[Record], filename string) error

func Sum

func Sum[T Numeric](stream Stream[T]) (T, error)

Sum aggregates numeric values

Types

type AccumulationMode

type AccumulationMode int

AccumulationMode defines how late data is handled

const (
	DiscardingMode AccumulationMode = iota // Discard late data
	AccumulateMode                         // Accumulate late data into existing windows
)

type ActivityDetector

type ActivityDetector[T any] func(T) bool

ActivityDetector determines if an element represents activity for session windows

type AdvancedCountTrigger

type AdvancedCountTrigger[T any] struct {
	// contains filtered or unexported fields
}

AdvancedCountTrigger fires when element count reaches threshold

func NewAdvancedCountTrigger

func NewAdvancedCountTrigger[T any](threshold int) *AdvancedCountTrigger[T]

func (*AdvancedCountTrigger[T]) GetType

func (t *AdvancedCountTrigger[T]) GetType() AdvancedTriggerType

func (*AdvancedCountTrigger[T]) Reset

func (t *AdvancedCountTrigger[T]) Reset()

func (*AdvancedCountTrigger[T]) ShouldFire

func (t *AdvancedCountTrigger[T]) ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool

type AdvancedProcessingTimeTrigger

type AdvancedProcessingTimeTrigger[T any] struct {
	// contains filtered or unexported fields
}

AdvancedProcessingTimeTrigger fires at regular processing time intervals

func NewAdvancedProcessingTimeTrigger

func NewAdvancedProcessingTimeTrigger[T any](interval time.Duration) *AdvancedProcessingTimeTrigger[T]

func (*AdvancedProcessingTimeTrigger[T]) GetType

func (*AdvancedProcessingTimeTrigger[T]) Reset

func (t *AdvancedProcessingTimeTrigger[T]) Reset()

func (*AdvancedProcessingTimeTrigger[T]) ShouldFire

func (t *AdvancedProcessingTimeTrigger[T]) ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool

type AdvancedTimeTrigger

type AdvancedTimeTrigger[T any] struct {
	// contains filtered or unexported fields
}

AdvancedTimeTrigger fires after a duration since window start

func NewAdvancedTimeTrigger

func NewAdvancedTimeTrigger[T any](duration time.Duration) *AdvancedTimeTrigger[T]

func (*AdvancedTimeTrigger[T]) GetType

func (t *AdvancedTimeTrigger[T]) GetType() AdvancedTriggerType

func (*AdvancedTimeTrigger[T]) Reset

func (t *AdvancedTimeTrigger[T]) Reset()

func (*AdvancedTimeTrigger[T]) ShouldFire

func (t *AdvancedTimeTrigger[T]) ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool

type AdvancedTrigger

type AdvancedTrigger[T any] interface {
	// ShouldFire returns true if the window should fire based on current state
	ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool

	// Reset resets the trigger state (called after firing)
	Reset()

	// GetType returns the trigger type for optimization
	GetType() AdvancedTriggerType
}

AdvancedTrigger defines when a window should fire/emit results in advanced windowing

type AdvancedTriggerType

type AdvancedTriggerType int

AdvancedTriggerType defines different trigger conditions for advanced windowing

const (
	AdvancedCountTriggerType AdvancedTriggerType = iota
	AdvancedTimeTriggerType
	AdvancedProcessingTimeTriggerType
	AdvancedCustomTriggerType
)

type Aggregator

type Aggregator[T, A, R any] struct {
	Initial    func() A     // Create initial accumulator
	Accumulate func(A, T) A // Process each element
	Finalize   func(A) R    // Produce final result
}

Aggregator defines a composable aggregation operation

func AvgAggregator

func AvgAggregator[I any, T Numeric](extract func(I) T) Aggregator[I, [2]float64, float64]

AvgAggregator creates an average aggregator with custom value extraction

func AvgAggregatorField

func AvgAggregatorField[T Numeric](fieldName string) Aggregator[Record, [2]float64, float64]

AvgAggregatorField creates an aggregator that averages a numeric field in records

func CountAggregator

func CountAggregator[I any]() Aggregator[I, int64, int64]

CountAggregator creates a count aggregator (doesn't need value extraction)

func CountAggregatorField

func CountAggregatorField(fieldName string) Aggregator[Record, int64, int64]

CountAggregatorField creates an aggregator that counts records (field name is ignored but maintained for consistency)

func MaxAggregator

func MaxAggregator[I any, T Comparable](extract func(I) T) Aggregator[I, *T, T]

MaxAggregator creates a max aggregator with custom value extraction

func MaxAggregatorField

func MaxAggregatorField[T Comparable](fieldName string) Aggregator[Record, *T, T]

MaxAggregatorField creates an aggregator that finds the maximum of a field in records

func MinAggregator

func MinAggregator[I any, T Comparable](extract func(I) T) Aggregator[I, *T, T]

MinAggregator creates a min aggregator with custom value extraction

func MinAggregatorField

func MinAggregatorField[T Comparable](fieldName string) Aggregator[Record, *T, T]

MinAggregatorField creates an aggregator that finds the minimum of a field in records

func SumAggregator

func SumAggregator[I any, T Numeric](extract func(I) T) Aggregator[I, T, T]

SumAggregator creates a sum aggregator with custom value extraction

func SumAggregatorField

func SumAggregatorField[T Numeric](fieldName string) Aggregator[Record, T, T]

SumAggregatorField creates an aggregator that sums a numeric field in records

type AggregatorSpec

type AggregatorSpec[T any] struct {
	Name string
	Agg  interface{} // Type-erased aggregator
}

AggregatorSpec represents a named aggregator specification

func AvgField

func AvgField[T Numeric](name, fieldName string) AggregatorSpec[Record]

AvgField creates an aggregator that averages a numeric field in records

func AvgStream

func AvgStream[T Numeric](name string) AggregatorSpec[T]

func CountField

func CountField(name, fieldName string) AggregatorSpec[Record]

CountField creates an aggregator that counts records (field name is ignored but maintained for consistency)

func CountStream

func CountStream[T any](name string) AggregatorSpec[T]

func CustomSpec

func CustomSpec[T, A, R any](name string, agg Aggregator[T, A, R]) AggregatorSpec[T]

CustomSpec creates a spec for any custom aggregator

func MaxField

func MaxField[T Comparable](name, fieldName string) AggregatorSpec[Record]

MaxField creates an aggregator that finds the maximum of a field in records

func MaxStream

func MaxStream[T Comparable](name string) AggregatorSpec[T]

func MinField

func MinField[T Comparable](name, fieldName string) AggregatorSpec[Record]

MinField creates an aggregator that finds the minimum of a field in records

func MinStream

func MinStream[T Comparable](name string) AggregatorSpec[T]

func SumField

func SumField[T Numeric](name, fieldName string) AggregatorSpec[Record]

SumField creates an aggregator that sums a numeric field in records

func SumStream

func SumStream[T Numeric](name string) AggregatorSpec[T]

Helper functions to create aggregator specs

type BoolStream

type BoolStream = Stream[bool]

type CPUExecutor

type CPUExecutor struct {
	// contains filtered or unexported fields
}

CPUExecutor implements stream operations using CPU-based processing

func NewCPUExecutor

func NewCPUExecutor() *CPUExecutor

NewCPUExecutor creates a new CPU executor

func (*CPUExecutor) CanHandle

func (e *CPUExecutor) CanHandle(op Operation, ctx ExecutionContext) bool

CanHandle returns true - CPU executor can handle any operation

func (*CPUExecutor) ExecuteFilter

func (e *CPUExecutor) ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteFilter performs filter operation using CPU

func (*CPUExecutor) ExecuteMap

func (e *CPUExecutor) ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteMap performs map operation using CPU

func (*CPUExecutor) ExecuteParallel

func (e *CPUExecutor) ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteParallel performs parallel processing using CPU

func (*CPUExecutor) ExecuteReduce

func (e *CPUExecutor) ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteReduce performs reduce operation using CPU

func (*CPUExecutor) GetScore

func (e *CPUExecutor) GetScore(op Operation, ctx ExecutionContext) int

GetScore returns a score for how well CPU handles the operation

func (*CPUExecutor) Name

func (e *CPUExecutor) Name() string

Name returns the executor name

type CSVSink

type CSVSink struct {
	Writer    io.Writer
	Separator rune
	Headers   []string
	// contains filtered or unexported fields
}

CSVSink configuration for writing CSV data

func NewCSVSink

func NewCSVSink(writer io.Writer) *CSVSink

NewCSVSink creates a CSV sink to a writer

func NewCSVSinkToFile

func NewCSVSinkToFile(filename string) (*CSVSink, error)

NewCSVSinkToFile creates a CSV sink to a file

func NewTSVSink

func NewTSVSink(writer io.Writer) *CSVSink

NewTSVSink creates a TSV sink to a writer

func NewTSVSinkToFile

func NewTSVSinkToFile(filename string) (*CSVSink, error)

NewTSVSinkToFile creates a TSV sink to a file

func (*CSVSink) WithHeaders

func (sink *CSVSink) WithHeaders(headers []string) *CSVSink

WithHeaders sets the headers for CSV output

func (*CSVSink) WriteRecords

func (sink *CSVSink) WriteRecords(records []Record) error

WriteRecords writes a slice of Records to CSV format

func (*CSVSink) WriteStream

func (sink *CSVSink) WriteStream(stream Stream[Record]) error

WriteStream writes a Record stream to CSV format

type CSVSource

type CSVSource struct {
	Reader    io.Reader
	HasHeader bool
	Separator rune
	Headers   []string
}

CSVSource configuration for reading CSV data

func NewCSVSource

func NewCSVSource(reader io.Reader) *CSVSource

NewCSVSource creates a CSV source from a reader

func NewCSVSourceFromFile

func NewCSVSourceFromFile(filename string) (*CSVSource, error)

NewCSVSourceFromFile creates a CSV source from a file

func NewTSVSource

func NewTSVSource(reader io.Reader) *CSVSource

TSVSource creates a TSV source (Tab-Separated Values) using CSV parser Use NewFastTSVSource for better performance if you don't need CSV escaping

func NewTSVSourceFromFile

func NewTSVSourceFromFile(filename string) (*CSVSource, error)

NewTSVSourceFromFile creates a TSV source from a file

func (*CSVSource) ToStream

func (cs *CSVSource) ToStream() Stream[Record]

ToStream converts CSV data to a Record stream

func (*CSVSource) WithHeaders

func (cs *CSVSource) WithHeaders(headers []string) *CSVSource

WithHeaders sets custom headers for the CSV

func (*CSVSource) WithoutHeaders

func (cs *CSVSource) WithoutHeaders() *CSVSource

WithoutHeaders configures CSV to not expect headers

type Comparable

type Comparable interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
		~float32 | ~float64 | ~string
}

Comparable constraint for ordering operations

type CountTrigger

type CountTrigger[T any] struct {
	N int
	// contains filtered or unexported fields
}

CountTrigger fires every N elements

func NewCountTrigger

func NewCountTrigger[T any](n int) *CountTrigger[T]

func (*CountTrigger[T]) ResetState

func (ct *CountTrigger[T]) ResetState() any

func (*CountTrigger[T]) ShouldFire

func (ct *CountTrigger[T]) ShouldFire(element T, state any) bool

type EventTimeSessionState

type EventTimeSessionState struct {
	// contains filtered or unexported fields
}

EventTimeSessionState tracks the state of an event-time session window

func NewEventTimeSessionState

func NewEventTimeSessionState(policy LateDataPolicy) *EventTimeSessionState

NewEventTimeSessionState creates a new session state

func (*EventTimeSessionState) AddElement

func (ss *EventTimeSessionState) AddElement(element Record, eventTime time.Time)

AddElement adds an element to the session

func (*EventTimeSessionState) Fire

func (ss *EventTimeSessionState) Fire() []Record

Fire triggers the session to emit results

func (*EventTimeSessionState) HasFired

func (ss *EventTimeSessionState) HasFired() bool

HasFired returns true if the session has already fired

func (*EventTimeSessionState) ShouldFire

func (ss *EventTimeSessionState) ShouldFire(watermark time.Time, sessionTimeout time.Duration) bool

ShouldFire determines if the session should fire based on timeout and watermark

type EventTimeWindowConfig

type EventTimeWindowConfig struct {
	TimestampExtractor RecordTimestampExtractor
	WatermarkGenerator WatermarkGenerator
	LateDataPolicy     LateDataPolicy
	AllowedLateness    time.Duration
}

EventTimeWindowConfig holds configuration for event-time windows on Records

type EventTimeWindowOption

type EventTimeWindowOption func(*EventTimeWindowConfig)

EventTimeWindowOption configures event-time windows

func WithAllowedLateness

func WithAllowedLateness(lateness time.Duration) EventTimeWindowOption

WithAllowedLateness sets the maximum allowed lateness (convenience for watermark generation)

func WithLateDataPolicy

func WithLateDataPolicy(policy LateDataPolicy) EventTimeWindowOption

WithLateDataPolicy sets how late data should be handled

func WithTimestampExtractor

func WithTimestampExtractor(extractor RecordTimestampExtractor) EventTimeWindowOption

WithTimestampExtractor sets the timestamp extraction function

func WithWatermarkGenerator

func WithWatermarkGenerator(generator WatermarkGenerator) EventTimeWindowOption

WithWatermarkGenerator sets the watermark generation strategy

type EventTimeWindowState

type EventTimeWindowState struct {
	// contains filtered or unexported fields
}

EventTimeWindowState tracks the state of an event-time window for Records

func NewEventTimeWindowState

func NewEventTimeWindowState(start, end time.Time, policy LateDataPolicy) *EventTimeWindowState

NewEventTimeWindowState creates a new event-time window state for Records

func (*EventTimeWindowState) AddElement

func (ws *EventTimeWindowState) AddElement(element Record, eventTime time.Time) bool

AddElement adds an element to the window if it belongs to this window

func (*EventTimeWindowState) Fire

func (ws *EventTimeWindowState) Fire() []Record

Fire triggers the window to emit results

func (*EventTimeWindowState) HasFired

func (ws *EventTimeWindowState) HasFired() bool

HasFired returns true if the window has already fired

func (*EventTimeWindowState) ShouldFire

func (ws *EventTimeWindowState) ShouldFire(watermark time.Time) bool

ShouldFire determines if the window should fire based on watermark

type ExecutionContext

type ExecutionContext struct {
	Ctx           context.Context
	MaxMemory     int64 // Available memory limit
	MaxGoroutines int   // CPU parallelism limit
	GPUMemory     int64 // Available GPU memory (0 if no GPU)
	BatchSize     int   // Preferred batch size for vectorization
}

ExecutionContext provides runtime information for operation execution

type Executor

type Executor interface {
	// CanHandle returns true if this executor can efficiently handle the operation
	CanHandle(op Operation, ctx ExecutionContext) bool

	// GetScore returns a score (0-100) indicating how well this executor handles the operation
	// Higher scores indicate better suitability
	GetScore(op Operation, ctx ExecutionContext) int

	// ExecuteMap performs map operation with this executor
	ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}

	// ExecuteFilter performs filter operation with this executor
	ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}

	// ExecuteReduce performs reduce operation with this executor
	ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}

	// ExecuteParallel performs parallel processing with this executor
	ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}

	// Name returns the executor name for debugging/telemetry
	Name() string
}

Executor interface abstracts different execution backends (CPU, GPU, distributed)

type ExecutorManager

type ExecutorManager struct {
	// contains filtered or unexported fields
}

ExecutorManager coordinates multiple executors and selects the best one

func NewExecutorManager

func NewExecutorManager() *ExecutorManager

NewExecutorManager creates a new executor manager with automatic backend detection

func (*ExecutorManager) AddExecutor

func (em *ExecutorManager) AddExecutor(executor Executor)

AddExecutor adds an executor to the manager

func (*ExecutorManager) SelectBest

func (em *ExecutorManager) SelectBest(op Operation, ctx ExecutionContext) Executor

SelectBest chooses the best executor for the given operation

type FastTSVSource

type FastTSVSource struct {
	Reader    io.Reader
	HasHeader bool
	Separator string // Can be any string, typically "\t" for TSV
	Headers   []string
}

FastTSVSource configuration for reading delimited data with simple string splitting This is much faster than CSV parsing but doesn't support escaping or quoted fields

func NewFastTSVSource

func NewFastTSVSource(reader io.Reader) *FastTSVSource

NewFastTSVSource creates a fast TSV source using simple string splitting

func NewFastTSVSourceWithSeparator

func NewFastTSVSourceWithSeparator(reader io.Reader, separator string) *FastTSVSource

NewFastTSVSourceWithSeparator creates a fast delimited source with custom separator

func (*FastTSVSource) ToStream

func (s *FastTSVSource) ToStream() Stream[Record]

ToStream converts the fast TSV source to a Record stream

func (*FastTSVSource) WithHeaders

func (s *FastTSVSource) WithHeaders(headers []string) *FastTSVSource

WithHeaders sets custom headers for the fast TSV source

func (*FastTSVSource) WithoutHeader

func (s *FastTSVSource) WithoutHeader() *FastTSVSource

WithoutHeader indicates the data has no header row

type Filter

type Filter[T, U any] func(Stream[T]) Stream[U]

Filter transforms one stream into another with full type flexibility

func BottomK

func BottomK[T any](k int, cmp func(a, b T) int) Filter[T, T]

BottomK maintains the bottom K elements

func Chain

func Chain[T any](filters ...Filter[T, T]) Filter[T, T]

Chain applies multiple filters of the same type

func CountWindow

func CountWindow[T any](windowSize int) Filter[T, Stream[T]]

CountWindow groups elements into batches of N elements. Each batch is emitted as a finite stream, enabling aggregations on infinite streams. Perfect for processing infinite streams in manageable chunks.

func CrossFlatten

func CrossFlatten(separator string, fields ...string) Filter[Record, Record]

CrossFlatten expands stream fields using cross product (cartesian product) expansion. Creates multiple output records from each input record containing stream fields. Example: {"id": 1, "tags": Stream["a", "b"]} → [{"id": 1, "tags": "a"}, {"id": 1, "tags": "b"}]

func DotFlatten

func DotFlatten(separator string, fields ...string) Filter[Record, Record]

DotFlatten flattens nested records using dot product flattening (single output per input). Nested records become prefixed fields: {"user": {"name": "Alice"}} → {"user.name": "Alice"} Stream fields are expanded using dot product (linear, one-to-one mapping). When streams have different lengths, uses minimum length and discards excess elements. Example with streams: {"id": 1, "tags": Stream["a", "b"], "scores": Stream[10, 20]} →

[{"id": 1, "tags": "a", "scores": 10}, {"id": 1, "tags": "b", "scores": 20}]

Example with different lengths: {"short": Stream["a", "b"], "long": Stream[1, 2, 3, 4]} →

[{"short": "a", "long": 1}, {"short": "b", "long": 2}] (elements 3, 4 discarded)

func ExtractField

func ExtractField[T any](field string) Filter[Record, T]

ExtractField gets a typed field from records

func FlatMap

func FlatMap[T, U any](fn func(T) Stream[U]) Filter[T, U]

FlatMap transforms elements and flattens the resulting streams

func FullJoin

func FullJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]

FullJoin performs a full outer join between left stream and right stream. All records from both streams are returned, with matching when available. WARNING: Right stream is collected into memory - must be finite and reasonably sized.

func GroupBy

func GroupBy(keyFields []string, aggregators ...AggregatorSpec[Record]) Filter[Record, Record]

GroupBy groups records by the specified fields and applies aggregations.

Returns one record per group containing:

  • The key fields from the group
  • Any additional aggregations specified

Example:

grouped := stream.GroupBy([]string{"department"})(users)
// Each result contains: department

groupedWithStats := stream.GroupBy([]string{"department"},
    stream.AvgField[float64]("avg_salary", "salary"),
    stream.CountField("count", "name"))(users)
// Each result contains: department, avg_salary, count

GroupBy groups records and applies custom aggregations to each group

func InnerJoin

func InnerJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]

InnerJoin performs an inner join between left stream and right stream. Only records with matching keys in both streams are returned. WARNING: Right stream is collected into memory - must be finite and reasonably sized.

func LeftJoin

func LeftJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]

LeftJoin performs a left join between left stream and right stream. All records from left stream are returned, with matching right records when available. WARNING: Right stream is collected into memory - must be finite and reasonably sized.

func Limit

func Limit[T any](n int) Filter[T, T]

Limit restricts stream to first N elements (equivalent to SQL LIMIT)

func Map

func Map[T, U any](fn func(T) U) Filter[T, U]

Map transforms each element in a stream with automatic parallelization for large datasets

func Offset

func Offset[T any](n int) Filter[T, T]

Offset skips first N elements (equivalent to SQL OFFSET)

func Parallel

func Parallel[T, U any](workers int, fn func(T) U) Filter[T, U]

Parallel processes elements concurrently using simple goroutines

func Pipe

func Pipe[T, U, V any](f1 Filter[T, U], f2 Filter[U, V]) Filter[T, V]

Pipe composes two filters

func Pipe3

func Pipe3[T, U, V, W any](f1 Filter[T, U], f2 Filter[U, V], f3 Filter[V, W]) Filter[T, W]

Pipe3 composes three filters

func RightJoin

func RightJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]

RightJoin performs a right join between left stream and right stream. All records from right stream are returned, with matching left records when available. WARNING: Right stream is collected into memory - must be finite and reasonably sized.

func Select

func Select(fields ...string) Filter[Record, Record]

Select extracts specific fields from records

func SessionWindow

func SessionWindow[T any](timeout time.Duration, activityDetector ActivityDetector[T]) Filter[T, Stream[T]]

SessionWindow creates activity-based windows that group related events Windows extend as long as activity is detected within the timeout period

func SlidingCountWindow

func SlidingCountWindow[T any](windowSize, stepSize int) Filter[T, Stream[T]]

SlidingCountWindow creates overlapping windows of size windowSize with step stepSize. Each window slides by stepSize elements, creating overlapping batches.

func Sort

func Sort[T any](cmp func(a, b T) int) Filter[T, T]

Sort sorts elements using a custom comparison function For finite streams only - infinite streams require windowing

func SortAsc

func SortAsc[T Comparable]() Filter[T, T]

SortAsc sorts elements in ascending order using Comparable constraint

func SortBy

func SortBy(fields ...string) Filter[Record, Record]

SortBy sorts Records by specified fields in ascending order

func SortByDesc

func SortByDesc(fields ...string) Filter[Record, Record]

SortByDesc sorts Records by specified fields in descending order

func SortCountWindow

func SortCountWindow[T any](windowSize int, cmp func(a, b T) int) Filter[T, T]

SortCountWindow sorts elements within count-based windows

func SortDesc

func SortDesc[T Comparable]() Filter[T, T]

SortDesc sorts elements in descending order using Comparable constraint

func SortTimeWindow

func SortTimeWindow[T any](duration time.Duration, cmp func(a, b T) int) Filter[T, T]

SortTimeWindow sorts elements within time-based windows

func Split

func Split(keyFields []string) Filter[Record, Stream[Record]]

Split splits a stream of records into substreams based on key fields. Each substream contains all records that share the same key values. Works with both finite and infinite streams using a central dispatcher.

func StreamingAvg

func StreamingAvg[T Numeric]() Filter[T, float64]

StreamingAvg emits running average as each element arrives.

func StreamingCount

func StreamingCount[T any]() Filter[T, int64]

StreamingCount emits running count as each element arrives.

func StreamingGroupBy

func StreamingGroupBy(keyFields []string, updateInterval int) Filter[Record, Record]

StreamingGroupBy maintains running group statistics and emits updates. Unlike regular GroupBy, this works with infinite streams by emitting updated group totals as new records arrive.

func StreamingMax

func StreamingMax[T Comparable]() Filter[T, T]

StreamingMax emits running maximum as each element arrives.

func StreamingMin

func StreamingMin[T Comparable]() Filter[T, T]

StreamingMin emits running minimum as each element arrives.

func StreamingStats

func StreamingStats[T Numeric]() Filter[T, Record]

StreamingStats emits comprehensive running statistics for each element. Returns Record with count, sum, avg, min, max.

func StreamingSum

func StreamingSum[T Numeric]() Filter[T, T]

StreamingSum emits running sum continuously as each element arrives. Perfect for real-time dashboards and monitoring.

func TimeWindow

func TimeWindow[T any](duration time.Duration) Filter[T, Stream[T]]

TimeWindow groups elements into time-based windows. Collects elements for the specified duration, then emits as a finite stream.

func TopK

func TopK[T any](k int, cmp func(a, b T) int) Filter[T, T]

TopK maintains the top K elements using a heap-based approach Suitable for infinite streams - provides approximate sorting

func TriggeredWindow

func TriggeredWindow[T any](trigger Trigger[T]) Filter[T, Stream[T]]

TriggeredWindow creates windows based on trigger conditions

func Update

func Update(fn func(Record) Record) Filter[Record, Record]

Update modifies records

func Where

func Where[T any](predicate func(T) bool) Filter[T, T]

Where keeps only elements matching a predicate with automatic parallelization

type FloatStream

type FloatStream = Stream[float64]

type GPUExecutor

type GPUExecutor struct {
	// contains filtered or unexported fields
}

GPUExecutor implements stream operations using NVIDIA CUDA acceleration This is a stub implementation - actual CUDA integration will be added when hardware is available

func NewGPUExecutor

func NewGPUExecutor() *GPUExecutor

NewGPUExecutor creates a new GPU executor if CUDA hardware is available Returns nil if no CUDA-capable GPU is detected

func (*GPUExecutor) CanHandle

func (e *GPUExecutor) CanHandle(op Operation, ctx ExecutionContext) bool

CanHandle returns true if GPU can handle the operation efficiently

func (*GPUExecutor) ExecuteFilter

func (e *GPUExecutor) ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteFilter performs filter operation using GPU

func (*GPUExecutor) ExecuteMap

func (e *GPUExecutor) ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteMap performs map operation using GPU

func (*GPUExecutor) ExecuteParallel

func (e *GPUExecutor) ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteParallel performs parallel processing using GPU

func (*GPUExecutor) ExecuteReduce

func (e *GPUExecutor) ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}

ExecuteReduce performs reduce operation using GPU

func (*GPUExecutor) GetScore

func (e *GPUExecutor) GetScore(op Operation, ctx ExecutionContext) int

GetScore returns a score for how well GPU handles the operation

func (*GPUExecutor) Name

func (e *GPUExecutor) Name() string

Name returns the executor name

type IntStream

type IntStream = Stream[int64]

Common stream type aliases

type JSONFormat

type JSONFormat int

JSONFormat specifies how JSON data is structured

const (
	JSONLines JSONFormat = iota // Each line is a separate JSON object
	JSONArray                   // Single JSON array containing objects
)

type JSONSink

type JSONSink struct {
	Writer io.Writer
	Format JSONFormat
	Pretty bool
}

JSONSink configuration for writing JSON data

func NewJSONSink

func NewJSONSink(writer io.Writer) *JSONSink

NewJSONSink creates a JSON sink to a writer (defaults to JSON Lines)

func NewJSONSinkToFile

func NewJSONSinkToFile(filename string) (*JSONSink, error)

func (*JSONSink) WithFormat

func (sink *JSONSink) WithFormat(format JSONFormat) *JSONSink

WithFormat sets the JSON format

func (*JSONSink) WithPrettyPrint

func (sink *JSONSink) WithPrettyPrint() *JSONSink

WithPrettyPrint enables pretty printing

func (*JSONSink) WriteRecords

func (sink *JSONSink) WriteRecords(records []Record) error

WriteRecords writes a slice of Records to JSON format

func (*JSONSink) WriteStream

func (sink *JSONSink) WriteStream(stream Stream[Record]) error

WriteStream writes a Record stream to JSON format

type JSONSource

type JSONSource struct {
	Reader io.Reader
	Format JSONFormat
}

JSONSource configuration for reading JSON data

func NewJSONSource

func NewJSONSource(reader io.Reader) *JSONSource

NewJSONSource creates a JSON source from a reader (defaults to JSON Lines)

func NewJSONSourceFromFile

func NewJSONSourceFromFile(filename string) (*JSONSource, error)

File-based JSON functions

func (*JSONSource) ToStream

func (js *JSONSource) ToStream() Stream[Record]

ToStream converts JSON data to a Record stream

func (*JSONSource) WithFormat

func (js *JSONSource) WithFormat(format JSONFormat) *JSONSource

WithFormat sets the JSON format

type JoinOption

type JoinOption func(*joinConfig)

JoinOption configures join behavior

func WithPrefixes

func WithPrefixes(leftPrefix, rightPrefix string) JoinOption

WithPrefixes sets custom prefixes for field name conflicts Default is "left." and "right."

type LateDataPolicy

type LateDataPolicy int

LateDataPolicy defines how to handle data that arrives after window firing

const (
	DropLateData   LateDataPolicy = iota // Ignore late-arriving data
	UpdateWindow                         // Update window results with late data
	SideOutputLate                       // Send late data to separate stream
)

type Numeric

type Numeric interface {
	~int | ~int8 | ~int16 | ~int32 | ~int64 |
		~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
		~float32 | ~float64
}

Numeric constraint for mathematical operations

type OpType

type OpType int

OpType represents the type of stream operation

const (
	OpMap OpType = iota
	OpFilter
	OpReduce
	OpGroupBy
	OpWindow
	OpSort
	OpParallel
)

type Operation

type Operation struct {
	Type           OpType
	DataType       reflect.Type
	InputSize      int64 // Estimated size (-1 if unknown/infinite)
	IsVectorizable bool  // Can benefit from SIMD/GPU vectorization
	MemoryUsage    int64 // Estimated memory usage in bytes
	Complexity     int   // Computational complexity (1=simple, 10=heavy)
}

Operation describes a stream operation with metadata for executor selection

func NewFilterOperation

func NewFilterOperation(dataType reflect.Type, size int64, complexity int) Operation

NewFilterOperation creates metadata for a Filter operation

func NewMapOperation

func NewMapOperation(dataType reflect.Type, size int64, complexity int) Operation

NewMapOperation creates metadata for a Map operation

func NewParallelOperation

func NewParallelOperation(dataType reflect.Type, size int64, workers int, complexity int) Operation

NewParallelOperation creates metadata for a Parallel operation

type ProtobufFormat

type ProtobufFormat int

ProtobufFormat specifies how protobuf data is structured

const (
	ProtobufDelimited ProtobufFormat = iota // Length-delimited messages (streaming)
	ProtobufJSON                            // JSON representation of protobuf
)

type ProtobufSink

type ProtobufSink struct {
	Writer      io.Writer
	MessageDesc protoreflect.MessageDescriptor
	Format      ProtobufFormat
}

ProtobufSink configuration for writing protobuf data

func NewProtobufSink

func NewProtobufSink(writer io.Writer, messageDesc protoreflect.MessageDescriptor) *ProtobufSink

NewProtobufSink creates a protobuf sink to a writer

func NewProtobufSinkToFile

func NewProtobufSinkToFile(filename string, messageDesc protoreflect.MessageDescriptor) (*ProtobufSink, error)

func (*ProtobufSink) WithFormat

func (sink *ProtobufSink) WithFormat(format ProtobufFormat) *ProtobufSink

WithFormat sets the protobuf format

func (*ProtobufSink) WriteRecords

func (sink *ProtobufSink) WriteRecords(records []Record) error

WriteRecords writes a slice of Records to protobuf format

func (*ProtobufSink) WriteStream

func (sink *ProtobufSink) WriteStream(stream Stream[Record]) error

WriteStream writes a Record stream to protobuf format

type ProtobufSource

type ProtobufSource struct {
	Reader      io.Reader
	MessageDesc protoreflect.MessageDescriptor
	Format      ProtobufFormat
}

ProtobufSource configuration for reading Protocol Buffer data

func NewProtobufSource

func NewProtobufSource(reader io.Reader, messageDesc protoreflect.MessageDescriptor) *ProtobufSource

NewProtobufSource creates a protobuf source from a reader with a message descriptor

func NewProtobufSourceFromFile

func NewProtobufSourceFromFile(filename string, messageDesc protoreflect.MessageDescriptor) (*ProtobufSource, error)

File-based protobuf functions

func (*ProtobufSource) ToStream

func (ps *ProtobufSource) ToStream() Stream[Record]

ToStream converts protobuf data to a Record stream

func (*ProtobufSource) WithFormat

func (ps *ProtobufSource) WithFormat(format ProtobufFormat) *ProtobufSource

WithFormat sets the protobuf format

type Record

type Record map[string]any

Record represents structured data with native Go types

func Aggregates

func Aggregates[T any](stream Stream[T], specs ...AggregatorSpec[T]) (Record, error)

Aggregates runs multiple named aggregators and returns results in a Record

func Field

func Field[V Value](key string, value V) Record

Field creates a single-field Record with compile-time type safety

func SetField

func SetField[V Value](r Record, field string, value V) Record

SetField assigns a value to a record field with compile-time type safety

func (Record) Has

func (r Record) Has(field string) bool

Has checks if a field exists

func (Record) Keys

func (r Record) Keys() []string

Keys returns all field names

func (Record) Set

func (r Record) Set(field string, value any) Record

Set creates a new Record with an additional field - immutable update

type RecordTimestampExtractor

type RecordTimestampExtractor func(Record) time.Time

RecordTimestampExtractor extracts event time from Record elements

func NewRecordTimestampExtractor

func NewRecordTimestampExtractor(fieldName string) RecordTimestampExtractor

NewRecordTimestampExtractor creates a timestamp extractor for Record types that returns standardized UTC timestamps for consistent time comparison

type SessionState

type SessionState[T any] struct {
	ID           string
	StartTime    time.Time
	LastActivity time.Time
	Elements     []T
}

SessionState holds the state of a session window

type Stream

type Stream[T any] func() (T, error)

Stream represents a generic data stream - the heart of V2

func CSVToStream

func CSVToStream(reader io.Reader) Stream[Record]

CSVToStream reads CSV from a reader and returns a Record stream

func CSVToStreamFromFile

func CSVToStreamFromFile(filename string) (Stream[Record], error)

File-based convenience functions for backward compatibility

func FastDelimitedToStreamFromFile

func FastDelimitedToStreamFromFile(filename string, separator string) (Stream[Record], error)

FastDelimitedToStreamFromFile reads delimited file with custom separator

func FastTSVToStream

func FastTSVToStream(reader io.Reader) Stream[Record]

FastTSVToStream reads delimited data using fast string splitting

func FastTSVToStreamFromFile

func FastTSVToStreamFromFile(filename string) (Stream[Record], error)

FastTSVToStreamFromFile reads TSV file using fast string splitting

func FastTSVToStreamWithSeparator

func FastTSVToStreamWithSeparator(reader io.Reader, separator string) Stream[Record]

FastTSVToStreamWithSeparator reads delimited data with custom separator

func From

func From[V Value](items ...V) Stream[V]

From creates a type-safe stream from variadic arguments (Safe by Default)

func FromChannel

func FromChannel[V Value](ch <-chan V) Stream[V]

FromChannel creates a type-safe stream from a channel (Safe by Default) Only accepts types that are compatible with Record Value system

func FromChannelAny

func FromChannelAny[T any](ch <-chan T) Stream[T]

FromChannelAny creates a stream from any type channel - USE WITH CAUTION This bypasses Value type safety - ensure types are compatible with your use case

func FromMaps

func FromMaps(maps []map[string]any) (Stream[Record], error)

FromMaps creates a Record stream from maps with field type validation

func FromMapsUnsafe

func FromMapsUnsafe(maps []map[string]any) Stream[Record]

FromMapsUnsafe creates a Record stream from maps without validation - USE WITH CAUTION

func FromRecords

func FromRecords(records []Record) (Stream[Record], error)

FromRecords creates a Record stream with field type validation

func FromRecordsUnsafe

func FromRecordsUnsafe(records []Record) Stream[Record]

FromRecordsUnsafe creates a Record stream without validation - USE WITH CAUTION

func FromSlice

func FromSlice[V Value](items []V) Stream[V]

FromSlice creates a type-safe stream from a slice (Safe by Default) Only accepts types that are compatible with Record Value system

func FromSliceAny

func FromSliceAny[T any](items []T) Stream[T]

FromSliceAny creates a stream from any type slice - USE WITH CAUTION This bypasses Value type safety - ensure types are compatible with your use case

func Generate

func Generate[V Value](generator func() (V, error)) Stream[V]

Generate creates a Value-safe stream using a generator function (Safe by Default)

func GenerateAny

func GenerateAny[T any](generator func() (T, error)) Stream[T]

GenerateAny creates a stream using any type generator - USE WITH CAUTION

func JSONToStream

func JSONToStream(reader io.Reader) Stream[Record]

JSONToStream reads JSON from a reader and returns a Record stream (defaults to JSON Lines)

func JSONToStreamFromFile

func JSONToStreamFromFile(filename string) (Stream[Record], error)

func LegacyTSVToStream

func LegacyTSVToStream(reader io.Reader) Stream[Record]

LegacyTSVToStream reads TSV using CSV parser (for backward compatibility)

func Once

func Once[V Value](item V) Stream[V]

Once creates a Value-safe stream with a single element (Safe by Default)

func OnceAny

func OnceAny[T any](item T) Stream[T]

OnceAny creates a stream with a single element of any type - USE WITH CAUTION

func ProtobufToStream

func ProtobufToStream(reader io.Reader, messageDesc protoreflect.MessageDescriptor) Stream[Record]

ProtobufToStream reads protobuf from a reader and returns a Record stream

func ProtobufToStreamFromFile

func ProtobufToStreamFromFile(filename string, messageDesc protoreflect.MessageDescriptor) (Stream[Record], error)

func Range

func Range(start, end, step int64) Stream[int64]

Range creates a numeric stream (int64 values are Value-compatible)

func TSVToStream

func TSVToStream(reader io.Reader) Stream[Record]

TSVToStream reads TSV from a reader and returns a Record stream Now uses fast string splitting for better performance

func TSVToStreamFromFile

func TSVToStreamFromFile(filename string) (Stream[Record], error)

func Tee

func Tee[T any](stream Stream[T], n int) []Stream[T]

Tee splits a stream into multiple identical streams for parallel consumption. Works with both finite and infinite streams using a broadcasting dispatcher with proper cleanup.

func WithContext

func WithContext[T any](ctx context.Context, stream Stream[T]) Stream[T]

WithContext adds context support to a stream

type StreamingCSVWriter

type StreamingCSVWriter struct {
	// contains filtered or unexported fields
}

StreamingCSVWriter allows writing Records to CSV as they arrive (for infinite streams)

func NewStreamingCSVWriter

func NewStreamingCSVWriter(w io.Writer, headers []string) *StreamingCSVWriter

NewStreamingCSVWriter creates a streaming CSV writer

func (*StreamingCSVWriter) Close

func (scw *StreamingCSVWriter) Close() error

Close flushes and closes the writer

func (*StreamingCSVWriter) WriteRecord

func (scw *StreamingCSVWriter) WriteRecord(record Record) error

WriteRecord writes a single record to the CSV stream

type StringStream

type StringStream = Stream[string]

type TimestampedRecord

type TimestampedRecord struct {
	Record    Record
	Timestamp time.Time
}

TimestampedRecord pairs a Record with its event timestamp

type Trigger

type Trigger[T any] interface {
	ShouldFire(element T, state any) bool
	ResetState() any
}

Trigger interface defines when to emit aggregation results

type TypedRecord

type TypedRecord struct {
	// contains filtered or unexported fields
}

TypedRecord provides type-safe field setting with method chaining

func NewRecord

func NewRecord() *TypedRecord

NewRecord creates a new type-safe Record builder

func (*TypedRecord) Bool

func (tr *TypedRecord) Bool(key string, value bool) *TypedRecord

func (*TypedRecord) Build

func (tr *TypedRecord) Build() Record

Build returns the completed Record

func (*TypedRecord) Float

func (tr *TypedRecord) Float(key string, value float64) *TypedRecord

func (*TypedRecord) Int

func (tr *TypedRecord) Int(key string, value int64) *TypedRecord

func (*TypedRecord) Record

func (tr *TypedRecord) Record(key string, value Record) *TypedRecord

func (*TypedRecord) Set

func (tr *TypedRecord) Set(key string, value any) *TypedRecord

Set adds a field with compile-time type safety

func (*TypedRecord) String

func (tr *TypedRecord) String(key string, value string) *TypedRecord

Convenience methods for common types with type safety

func (*TypedRecord) Time

func (tr *TypedRecord) Time(key string, value time.Time) *TypedRecord

type Value

Value defines the compile-time type constraint for Record field values This replaces runtime reflection with compile-time type safety

type ValueChangeTrigger

type ValueChangeTrigger[T any] struct {
	ExtractFunc func(T) any
	// contains filtered or unexported fields
}

ValueChangeTrigger fires when a specific field value changes

func NewValueChangeTrigger

func NewValueChangeTrigger[T any](extractFunc func(T) any) *ValueChangeTrigger[T]

func (*ValueChangeTrigger[T]) ResetState

func (vct *ValueChangeTrigger[T]) ResetState() any

func (*ValueChangeTrigger[T]) ShouldFire

func (vct *ValueChangeTrigger[T]) ShouldFire(element T, state any) bool

type WatermarkGenerator

type WatermarkGenerator func(maxEventTime time.Time) time.Time

WatermarkGenerator generates watermarks based on observed event times

func BoundedOutOfOrdernessWatermark

func BoundedOutOfOrdernessWatermark(maxLateness time.Duration) WatermarkGenerator

BoundedOutOfOrdernessWatermark generates watermarks allowing specified lateness

func PeriodicWatermarkGenerator

func PeriodicWatermarkGenerator(interval time.Duration, baseGenerator WatermarkGenerator) WatermarkGenerator

PeriodicWatermarkGenerator creates a watermark generator that updates periodically

type WatermarkTracker

type WatermarkTracker struct {
	// contains filtered or unexported fields
}

WatermarkTracker manages watermark progression for event-time processing

func NewWatermarkTracker

func NewWatermarkTracker(generator WatermarkGenerator) *WatermarkTracker

NewWatermarkTracker creates a new watermark tracker

func (*WatermarkTracker) GetWatermark

func (wt *WatermarkTracker) GetWatermark() time.Time

GetWatermark returns the current watermark

func (*WatermarkTracker) UpdateWatermark

func (wt *WatermarkTracker) UpdateWatermark(eventTime time.Time) time.Time

UpdateWatermark processes a new event time and potentially advances the watermark

type WindowBuilder

type WindowBuilder[T any] struct {
	// contains filtered or unexported fields
}

WindowBuilder provides a fluent API for configuring advanced windows

func Window

func Window[T any]() *WindowBuilder[T]

Window creates a new window builder for advanced windowing configuration

func (*WindowBuilder[T]) AccumulationMode

func (wb *WindowBuilder[T]) AccumulationMode() *WindowBuilder[T]

AccumulationMode sets the accumulation mode for late data

func (*WindowBuilder[T]) AllowLateness

func (wb *WindowBuilder[T]) AllowLateness(lateness time.Duration) *WindowBuilder[T]

AllowLateness configures how late data should be handled

func (*WindowBuilder[T]) Apply

func (wb *WindowBuilder[T]) Apply() Filter[T, Stream[T]]

Apply creates the windowing filter with the configured settings

func (*WindowBuilder[T]) DiscardingMode

func (wb *WindowBuilder[T]) DiscardingMode() *WindowBuilder[T]

DiscardingMode sets the discarding mode for late data (default)

func (*WindowBuilder[T]) TriggerOn

func (wb *WindowBuilder[T]) TriggerOn(trigger AdvancedTrigger[T]) *WindowBuilder[T]

TriggerOn adds a custom trigger to the window

func (*WindowBuilder[T]) TriggerOnCount

func (wb *WindowBuilder[T]) TriggerOnCount(count int) *WindowBuilder[T]

TriggerOnCount adds a count-based trigger to the window

func (*WindowBuilder[T]) TriggerOnProcessingTime

func (wb *WindowBuilder[T]) TriggerOnProcessingTime(interval time.Duration) *WindowBuilder[T]

TriggerOnProcessingTime adds a processing time trigger

func (*WindowBuilder[T]) TriggerOnTime

func (wb *WindowBuilder[T]) TriggerOnTime(duration time.Duration) *WindowBuilder[T]

TriggerOnTime adds a time-based trigger to the window

type WindowState

type WindowState[T any] struct {
	ID           string
	StartTime    time.Time
	EndTime      time.Time
	Elements     []T
	ElementCount int
	LastActivity time.Time
	LastFired    time.Time
	Metadata     map[string]interface{}
	// contains filtered or unexported fields
}

WindowState holds the current state of a window

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL