Documentation
¶
Index ¶
- Variables
- func AggregateMultiple[T any, A1, A2, R1, R2 any](stream Stream[T], agg1 Aggregator[T, A1, R1], agg2 Aggregator[T, A2, R2]) (R1, R2, error)
- func AggregateTriple[T any, A1, A2, A3, R1, R2, R3 any](stream Stream[T], agg1 Aggregator[T, A1, R1], agg2 Aggregator[T, A2, R2], ...) (R1, R2, R3, error)
- func AggregateWith[T, A, R any](stream Stream[T], agg Aggregator[T, A, R]) (R, error)
- func Avg[T Numeric](stream Stream[T]) (float64, error)
- func Collect[T any](stream Stream[T]) ([]T, error)
- func Count[T any](stream Stream[T]) (int64, error)
- func EventTimeSessionWindow(sessionTimeout time.Duration, options ...EventTimeWindowOption) func(Stream[Record]) Stream[Stream[Record]]
- func EventTimeSlidingWindow(windowSize time.Duration, slideInterval time.Duration, ...) func(Stream[Record]) Stream[Stream[Record]]
- func EventTimeTumblingWindow(windowSize time.Duration, options ...EventTimeWindowOption) func(Stream[Record]) Stream[Stream[Record]]
- func ForEach[T any](fn func(T)) func(Stream[T]) error
- func Get[T any](r Record, field string) (T, bool)
- func GetOr[T any](r Record, field string, defaultVal T) T
- func IsStreamType(value any) bool
- func Max[T Comparable](stream Stream[T]) (T, error)
- func Min[T Comparable](stream Stream[T]) (T, error)
- func ParseStandardTime(val any) (time.Time, bool)
- func RunAggregator[T, A, R any](stream Stream[T], agg Aggregator[T, A, R]) (R, error)
- func StandardizeTime(t time.Time) time.Time
- func StandardizeTimeNano(t time.Time) time.Time
- func StreamToCSV(stream Stream[Record], writer io.Writer) error
- func StreamToCSVFile(stream Stream[Record], filename string) error
- func StreamToJSON(stream Stream[Record], writer io.Writer) error
- func StreamToJSONFile(stream Stream[Record], filename string) error
- func StreamToProtobuf(stream Stream[Record], writer io.Writer, ...) error
- func StreamToProtobufFile(stream Stream[Record], filename string, ...) error
- func StreamToTSV(stream Stream[Record], writer io.Writer) error
- func StreamToTSVFile(stream Stream[Record], filename string) error
- func Sum[T Numeric](stream Stream[T]) (T, error)
- type AccumulationMode
- type ActivityDetector
- type AdvancedCountTrigger
- type AdvancedProcessingTimeTrigger
- type AdvancedTimeTrigger
- type AdvancedTrigger
- type AdvancedTriggerType
- type Aggregator
- func AvgAggregator[I any, T Numeric](extract func(I) T) Aggregator[I, [2]float64, float64]
- func AvgAggregatorField[T Numeric](fieldName string) Aggregator[Record, [2]float64, float64]
- func CountAggregator[I any]() Aggregator[I, int64, int64]
- func CountAggregatorField(fieldName string) Aggregator[Record, int64, int64]
- func MaxAggregator[I any, T Comparable](extract func(I) T) Aggregator[I, *T, T]
- func MaxAggregatorField[T Comparable](fieldName string) Aggregator[Record, *T, T]
- func MinAggregator[I any, T Comparable](extract func(I) T) Aggregator[I, *T, T]
- func MinAggregatorField[T Comparable](fieldName string) Aggregator[Record, *T, T]
- func SumAggregator[I any, T Numeric](extract func(I) T) Aggregator[I, T, T]
- func SumAggregatorField[T Numeric](fieldName string) Aggregator[Record, T, T]
- type AggregatorSpec
- func AvgField[T Numeric](name, fieldName string) AggregatorSpec[Record]
- func AvgStream[T Numeric](name string) AggregatorSpec[T]
- func CountField(name, fieldName string) AggregatorSpec[Record]
- func CountStream[T any](name string) AggregatorSpec[T]
- func CustomSpec[T, A, R any](name string, agg Aggregator[T, A, R]) AggregatorSpec[T]
- func MaxField[T Comparable](name, fieldName string) AggregatorSpec[Record]
- func MaxStream[T Comparable](name string) AggregatorSpec[T]
- func MinField[T Comparable](name, fieldName string) AggregatorSpec[Record]
- func MinStream[T Comparable](name string) AggregatorSpec[T]
- func SumField[T Numeric](name, fieldName string) AggregatorSpec[Record]
- func SumStream[T Numeric](name string) AggregatorSpec[T]
- type BoolStream
- type CPUExecutor
- func (e *CPUExecutor) CanHandle(op Operation, ctx ExecutionContext) bool
- func (e *CPUExecutor) ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *CPUExecutor) ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *CPUExecutor) ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *CPUExecutor) ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *CPUExecutor) GetScore(op Operation, ctx ExecutionContext) int
- func (e *CPUExecutor) Name() string
- type CSVSink
- type CSVSource
- type Comparable
- type CountTrigger
- type EventTimeSessionState
- type EventTimeWindowConfig
- type EventTimeWindowOption
- func WithAllowedLateness(lateness time.Duration) EventTimeWindowOption
- func WithLateDataPolicy(policy LateDataPolicy) EventTimeWindowOption
- func WithTimestampExtractor(extractor RecordTimestampExtractor) EventTimeWindowOption
- func WithWatermarkGenerator(generator WatermarkGenerator) EventTimeWindowOption
- type EventTimeWindowState
- type ExecutionContext
- type Executor
- type ExecutorManager
- type FastTSVSource
- type Filter
- func BottomK[T any](k int, cmp func(a, b T) int) Filter[T, T]
- func Chain[T any](filters ...Filter[T, T]) Filter[T, T]
- func CountWindow[T any](windowSize int) Filter[T, Stream[T]]
- func CrossFlatten(separator string, fields ...string) Filter[Record, Record]
- func DotFlatten(separator string, fields ...string) Filter[Record, Record]
- func ExtractField[T any](field string) Filter[Record, T]
- func FlatMap[T, U any](fn func(T) Stream[U]) Filter[T, U]
- func FullJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
- func GroupBy(keyFields []string, aggregators ...AggregatorSpec[Record]) Filter[Record, Record]
- func InnerJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
- func LeftJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
- func Limit[T any](n int) Filter[T, T]
- func Map[T, U any](fn func(T) U) Filter[T, U]
- func Offset[T any](n int) Filter[T, T]
- func Parallel[T, U any](workers int, fn func(T) U) Filter[T, U]
- func Pipe[T, U, V any](f1 Filter[T, U], f2 Filter[U, V]) Filter[T, V]
- func Pipe3[T, U, V, W any](f1 Filter[T, U], f2 Filter[U, V], f3 Filter[V, W]) Filter[T, W]
- func RightJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
- func Select(fields ...string) Filter[Record, Record]
- func SessionWindow[T any](timeout time.Duration, activityDetector ActivityDetector[T]) Filter[T, Stream[T]]
- func SlidingCountWindow[T any](windowSize, stepSize int) Filter[T, Stream[T]]
- func Sort[T any](cmp func(a, b T) int) Filter[T, T]
- func SortAsc[T Comparable]() Filter[T, T]
- func SortBy(fields ...string) Filter[Record, Record]
- func SortByDesc(fields ...string) Filter[Record, Record]
- func SortCountWindow[T any](windowSize int, cmp func(a, b T) int) Filter[T, T]
- func SortDesc[T Comparable]() Filter[T, T]
- func SortTimeWindow[T any](duration time.Duration, cmp func(a, b T) int) Filter[T, T]
- func Split(keyFields []string) Filter[Record, Stream[Record]]
- func StreamingAvg[T Numeric]() Filter[T, float64]
- func StreamingCount[T any]() Filter[T, int64]
- func StreamingGroupBy(keyFields []string, updateInterval int) Filter[Record, Record]
- func StreamingMax[T Comparable]() Filter[T, T]
- func StreamingMin[T Comparable]() Filter[T, T]
- func StreamingStats[T Numeric]() Filter[T, Record]
- func StreamingSum[T Numeric]() Filter[T, T]
- func TimeWindow[T any](duration time.Duration) Filter[T, Stream[T]]
- func TopK[T any](k int, cmp func(a, b T) int) Filter[T, T]
- func TriggeredWindow[T any](trigger Trigger[T]) Filter[T, Stream[T]]
- func Update(fn func(Record) Record) Filter[Record, Record]
- func Where[T any](predicate func(T) bool) Filter[T, T]
- type FloatStream
- type GPUExecutor
- func (e *GPUExecutor) CanHandle(op Operation, ctx ExecutionContext) bool
- func (e *GPUExecutor) ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *GPUExecutor) ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *GPUExecutor) ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *GPUExecutor) ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}
- func (e *GPUExecutor) GetScore(op Operation, ctx ExecutionContext) int
- func (e *GPUExecutor) Name() string
- type IntStream
- type JSONFormat
- type JSONSink
- type JSONSource
- type JoinOption
- type LateDataPolicy
- type Numeric
- type OpType
- type Operation
- type ProtobufFormat
- type ProtobufSink
- type ProtobufSource
- type Record
- type RecordTimestampExtractor
- type SessionState
- type Stream
- func CSVToStream(reader io.Reader) Stream[Record]
- func CSVToStreamFromFile(filename string) (Stream[Record], error)
- func FastDelimitedToStreamFromFile(filename string, separator string) (Stream[Record], error)
- func FastTSVToStream(reader io.Reader) Stream[Record]
- func FastTSVToStreamFromFile(filename string) (Stream[Record], error)
- func FastTSVToStreamWithSeparator(reader io.Reader, separator string) Stream[Record]
- func From[V Value](items ...V) Stream[V]
- func FromChannel[V Value](ch <-chan V) Stream[V]
- func FromChannelAny[T any](ch <-chan T) Stream[T]
- func FromMaps(maps []map[string]any) (Stream[Record], error)
- func FromMapsUnsafe(maps []map[string]any) Stream[Record]
- func FromRecords(records []Record) (Stream[Record], error)
- func FromRecordsUnsafe(records []Record) Stream[Record]
- func FromSlice[V Value](items []V) Stream[V]
- func FromSliceAny[T any](items []T) Stream[T]
- func Generate[V Value](generator func() (V, error)) Stream[V]
- func GenerateAny[T any](generator func() (T, error)) Stream[T]
- func JSONToStream(reader io.Reader) Stream[Record]
- func JSONToStreamFromFile(filename string) (Stream[Record], error)
- func LegacyTSVToStream(reader io.Reader) Stream[Record]
- func Once[V Value](item V) Stream[V]
- func OnceAny[T any](item T) Stream[T]
- func ProtobufToStream(reader io.Reader, messageDesc protoreflect.MessageDescriptor) Stream[Record]
- func ProtobufToStreamFromFile(filename string, messageDesc protoreflect.MessageDescriptor) (Stream[Record], error)
- func Range(start, end, step int64) Stream[int64]
- func TSVToStream(reader io.Reader) Stream[Record]
- func TSVToStreamFromFile(filename string) (Stream[Record], error)
- func Tee[T any](stream Stream[T], n int) []Stream[T]
- func WithContext[T any](ctx context.Context, stream Stream[T]) Stream[T]
- type StreamingCSVWriter
- type StringStream
- type TimestampedRecord
- type Trigger
- type TypedRecord
- func (tr *TypedRecord) Bool(key string, value bool) *TypedRecord
- func (tr *TypedRecord) Build() Record
- func (tr *TypedRecord) Float(key string, value float64) *TypedRecord
- func (tr *TypedRecord) Int(key string, value int64) *TypedRecord
- func (tr *TypedRecord) Record(key string, value Record) *TypedRecord
- func (tr *TypedRecord) Set(key string, value any) *TypedRecord
- func (tr *TypedRecord) String(key string, value string) *TypedRecord
- func (tr *TypedRecord) Time(key string, value time.Time) *TypedRecord
- type Value
- type ValueChangeTrigger
- type WatermarkGenerator
- type WatermarkTracker
- type WindowBuilder
- func (wb *WindowBuilder[T]) AccumulationMode() *WindowBuilder[T]
- func (wb *WindowBuilder[T]) AllowLateness(lateness time.Duration) *WindowBuilder[T]
- func (wb *WindowBuilder[T]) Apply() Filter[T, Stream[T]]
- func (wb *WindowBuilder[T]) DiscardingMode() *WindowBuilder[T]
- func (wb *WindowBuilder[T]) TriggerOn(trigger AdvancedTrigger[T]) *WindowBuilder[T]
- func (wb *WindowBuilder[T]) TriggerOnCount(count int) *WindowBuilder[T]
- func (wb *WindowBuilder[T]) TriggerOnProcessingTime(interval time.Duration) *WindowBuilder[T]
- func (wb *WindowBuilder[T]) TriggerOnTime(duration time.Duration) *WindowBuilder[T]
- type WindowState
Constants ¶
This section is empty.
Variables ¶
var EOS = errors.New("end of stream")
EOS signals end of stream
Functions ¶
func AggregateMultiple ¶
func AggregateMultiple[T any, A1, A2, R1, R2 any]( stream Stream[T], agg1 Aggregator[T, A1, R1], agg2 Aggregator[T, A2, R2], ) (R1, R2, error)
AggregateMultiple runs multiple aggregators in parallel using Tee
func AggregateTriple ¶
func AggregateTriple[T any, A1, A2, A3, R1, R2, R3 any]( stream Stream[T], agg1 Aggregator[T, A1, R1], agg2 Aggregator[T, A2, R2], agg3 Aggregator[T, A3, R3], ) (R1, R2, R3, error)
AggregateTriple runs three aggregators in parallel
func AggregateWith ¶
func AggregateWith[T, A, R any](stream Stream[T], agg Aggregator[T, A, R]) (R, error)
AggregateWith runs a single custom aggregator on a stream
func EventTimeSessionWindow ¶
func EventTimeSessionWindow( sessionTimeout time.Duration, options ...EventTimeWindowOption, ) func(Stream[Record]) Stream[Stream[Record]]
EventTimeSessionWindow creates session windows based on event time for Records
func EventTimeSlidingWindow ¶
func EventTimeSlidingWindow( windowSize time.Duration, slideInterval time.Duration, options ...EventTimeWindowOption, ) func(Stream[Record]) Stream[Stream[Record]]
EventTimeSlidingWindow creates sliding windows based on event time for Records
func EventTimeTumblingWindow ¶
func EventTimeTumblingWindow( windowSize time.Duration, options ...EventTimeWindowOption, ) func(Stream[Record]) Stream[Stream[Record]]
EventTimeTumblingWindow creates tumbling windows based on event time for Records
func IsStreamType ¶
IsStreamType checks if a value is a Stream[T] type
func ParseStandardTime ¶
ParseStandardTime parses various time formats and returns standardized UTC time
func RunAggregator ¶
func RunAggregator[T, A, R any](stream Stream[T], agg Aggregator[T, A, R]) (R, error)
RunAggregator runs a single aggregator on a stream and returns the result
func StandardizeTime ¶
StandardizeTime converts any time value to UTC and rounds to nearest second for consistent time comparisons across different timezones and precisions
func StandardizeTimeNano ¶
StandardizeTimeNano converts time to UTC and rounds to nearest nanosecond for high-precision time operations while ensuring timezone consistency
func StreamToCSV ¶
StreamToCSV writes a Record stream to a CSV writer
func StreamToJSON ¶
StreamToJSON writes a Record stream to a JSON writer (defaults to JSON Lines)
func StreamToProtobuf ¶
func StreamToProtobuf(stream Stream[Record], writer io.Writer, messageDesc protoreflect.MessageDescriptor) error
StreamToProtobuf writes a Record stream to a protobuf writer
func StreamToProtobufFile ¶
func StreamToProtobufFile(stream Stream[Record], filename string, messageDesc protoreflect.MessageDescriptor) error
func StreamToTSV ¶
StreamToTSV writes a Record stream to a TSV writer
Types ¶
type AccumulationMode ¶
type AccumulationMode int
AccumulationMode defines how late data is handled
const ( DiscardingMode AccumulationMode = iota // Discard late data AccumulateMode // Accumulate late data into existing windows )
type ActivityDetector ¶
ActivityDetector determines if an element represents activity for session windows
type AdvancedCountTrigger ¶
type AdvancedCountTrigger[T any] struct { // contains filtered or unexported fields }
AdvancedCountTrigger fires when element count reaches threshold
func NewAdvancedCountTrigger ¶
func NewAdvancedCountTrigger[T any](threshold int) *AdvancedCountTrigger[T]
func (*AdvancedCountTrigger[T]) GetType ¶
func (t *AdvancedCountTrigger[T]) GetType() AdvancedTriggerType
func (*AdvancedCountTrigger[T]) Reset ¶
func (t *AdvancedCountTrigger[T]) Reset()
func (*AdvancedCountTrigger[T]) ShouldFire ¶
func (t *AdvancedCountTrigger[T]) ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool
type AdvancedProcessingTimeTrigger ¶
type AdvancedProcessingTimeTrigger[T any] struct { // contains filtered or unexported fields }
AdvancedProcessingTimeTrigger fires at regular processing time intervals
func NewAdvancedProcessingTimeTrigger ¶
func NewAdvancedProcessingTimeTrigger[T any](interval time.Duration) *AdvancedProcessingTimeTrigger[T]
func (*AdvancedProcessingTimeTrigger[T]) GetType ¶
func (t *AdvancedProcessingTimeTrigger[T]) GetType() AdvancedTriggerType
func (*AdvancedProcessingTimeTrigger[T]) Reset ¶
func (t *AdvancedProcessingTimeTrigger[T]) Reset()
func (*AdvancedProcessingTimeTrigger[T]) ShouldFire ¶
func (t *AdvancedProcessingTimeTrigger[T]) ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool
type AdvancedTimeTrigger ¶
type AdvancedTimeTrigger[T any] struct { // contains filtered or unexported fields }
AdvancedTimeTrigger fires after a duration since window start
func NewAdvancedTimeTrigger ¶
func NewAdvancedTimeTrigger[T any](duration time.Duration) *AdvancedTimeTrigger[T]
func (*AdvancedTimeTrigger[T]) GetType ¶
func (t *AdvancedTimeTrigger[T]) GetType() AdvancedTriggerType
func (*AdvancedTimeTrigger[T]) Reset ¶
func (t *AdvancedTimeTrigger[T]) Reset()
func (*AdvancedTimeTrigger[T]) ShouldFire ¶
func (t *AdvancedTimeTrigger[T]) ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool
type AdvancedTrigger ¶
type AdvancedTrigger[T any] interface { // ShouldFire returns true if the window should fire based on current state ShouldFire(window *WindowState[T], element T, processingTime time.Time) bool // Reset resets the trigger state (called after firing) Reset() // GetType returns the trigger type for optimization GetType() AdvancedTriggerType }
AdvancedTrigger defines when a window should fire/emit results in advanced windowing
type AdvancedTriggerType ¶
type AdvancedTriggerType int
AdvancedTriggerType defines different trigger conditions for advanced windowing
const ( AdvancedCountTriggerType AdvancedTriggerType = iota AdvancedTimeTriggerType AdvancedProcessingTimeTriggerType AdvancedCustomTriggerType )
type Aggregator ¶
type Aggregator[T, A, R any] struct { Initial func() A // Create initial accumulator Accumulate func(A, T) A // Process each element Finalize func(A) R // Produce final result }
Aggregator defines a composable aggregation operation
func AvgAggregator ¶
func AvgAggregator[I any, T Numeric](extract func(I) T) Aggregator[I, [2]float64, float64]
AvgAggregator creates an average aggregator with custom value extraction
func AvgAggregatorField ¶
AvgAggregatorField creates an aggregator that averages a numeric field in records
func CountAggregator ¶
func CountAggregator[I any]() Aggregator[I, int64, int64]
CountAggregator creates a count aggregator (doesn't need value extraction)
func CountAggregatorField ¶
func CountAggregatorField(fieldName string) Aggregator[Record, int64, int64]
CountAggregatorField creates an aggregator that counts records (field name is ignored but maintained for consistency)
func MaxAggregator ¶
func MaxAggregator[I any, T Comparable](extract func(I) T) Aggregator[I, *T, T]
MaxAggregator creates a max aggregator with custom value extraction
func MaxAggregatorField ¶
func MaxAggregatorField[T Comparable](fieldName string) Aggregator[Record, *T, T]
MaxAggregatorField creates an aggregator that finds the maximum of a field in records
func MinAggregator ¶
func MinAggregator[I any, T Comparable](extract func(I) T) Aggregator[I, *T, T]
MinAggregator creates a min aggregator with custom value extraction
func MinAggregatorField ¶
func MinAggregatorField[T Comparable](fieldName string) Aggregator[Record, *T, T]
MinAggregatorField creates an aggregator that finds the minimum of a field in records
func SumAggregator ¶
func SumAggregator[I any, T Numeric](extract func(I) T) Aggregator[I, T, T]
SumAggregator creates a sum aggregator with custom value extraction
func SumAggregatorField ¶
func SumAggregatorField[T Numeric](fieldName string) Aggregator[Record, T, T]
SumAggregatorField creates an aggregator that sums a numeric field in records
type AggregatorSpec ¶
AggregatorSpec represents a named aggregator specification
func AvgField ¶
func AvgField[T Numeric](name, fieldName string) AggregatorSpec[Record]
AvgField creates an aggregator that averages a numeric field in records
func AvgStream ¶
func AvgStream[T Numeric](name string) AggregatorSpec[T]
func CountField ¶
func CountField(name, fieldName string) AggregatorSpec[Record]
CountField creates an aggregator that counts records (field name is ignored but maintained for consistency)
func CountStream ¶
func CountStream[T any](name string) AggregatorSpec[T]
func CustomSpec ¶
func CustomSpec[T, A, R any](name string, agg Aggregator[T, A, R]) AggregatorSpec[T]
CustomSpec creates a spec for any custom aggregator
func MaxField ¶
func MaxField[T Comparable](name, fieldName string) AggregatorSpec[Record]
MaxField creates an aggregator that finds the maximum of a field in records
func MaxStream ¶
func MaxStream[T Comparable](name string) AggregatorSpec[T]
func MinField ¶
func MinField[T Comparable](name, fieldName string) AggregatorSpec[Record]
MinField creates an aggregator that finds the minimum of a field in records
func MinStream ¶
func MinStream[T Comparable](name string) AggregatorSpec[T]
func SumField ¶
func SumField[T Numeric](name, fieldName string) AggregatorSpec[Record]
SumField creates an aggregator that sums a numeric field in records
func SumStream ¶
func SumStream[T Numeric](name string) AggregatorSpec[T]
Helper functions to create aggregator specs
type BoolStream ¶
type CPUExecutor ¶
type CPUExecutor struct {
// contains filtered or unexported fields
}
CPUExecutor implements stream operations using CPU-based processing
func (*CPUExecutor) CanHandle ¶
func (e *CPUExecutor) CanHandle(op Operation, ctx ExecutionContext) bool
CanHandle returns true - CPU executor can handle any operation
func (*CPUExecutor) ExecuteFilter ¶
func (e *CPUExecutor) ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteFilter performs filter operation using CPU
func (*CPUExecutor) ExecuteMap ¶
func (e *CPUExecutor) ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteMap performs map operation using CPU
func (*CPUExecutor) ExecuteParallel ¶
func (e *CPUExecutor) ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteParallel performs parallel processing using CPU
func (*CPUExecutor) ExecuteReduce ¶
func (e *CPUExecutor) ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteReduce performs reduce operation using CPU
func (*CPUExecutor) GetScore ¶
func (e *CPUExecutor) GetScore(op Operation, ctx ExecutionContext) int
GetScore returns a score for how well CPU handles the operation
type CSVSink ¶
type CSVSink struct {
Writer io.Writer
Separator rune
Headers []string
// contains filtered or unexported fields
}
CSVSink configuration for writing CSV data
func NewCSVSink ¶
NewCSVSink creates a CSV sink to a writer
func NewCSVSinkToFile ¶
NewCSVSinkToFile creates a CSV sink to a file
func NewTSVSink ¶
NewTSVSink creates a TSV sink to a writer
func NewTSVSinkToFile ¶
NewTSVSinkToFile creates a TSV sink to a file
func (*CSVSink) WithHeaders ¶
WithHeaders sets the headers for CSV output
func (*CSVSink) WriteRecords ¶
WriteRecords writes a slice of Records to CSV format
type CSVSource ¶
CSVSource configuration for reading CSV data
func NewCSVSource ¶
NewCSVSource creates a CSV source from a reader
func NewCSVSourceFromFile ¶
NewCSVSourceFromFile creates a CSV source from a file
func NewTSVSource ¶
TSVSource creates a TSV source (Tab-Separated Values) using CSV parser Use NewFastTSVSource for better performance if you don't need CSV escaping
func NewTSVSourceFromFile ¶
NewTSVSourceFromFile creates a TSV source from a file
func (*CSVSource) WithHeaders ¶
WithHeaders sets custom headers for the CSV
func (*CSVSource) WithoutHeaders ¶
WithoutHeaders configures CSV to not expect headers
type Comparable ¶
type Comparable interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
~float32 | ~float64 | ~string
}
Comparable constraint for ordering operations
type CountTrigger ¶
CountTrigger fires every N elements
func NewCountTrigger ¶
func NewCountTrigger[T any](n int) *CountTrigger[T]
func (*CountTrigger[T]) ResetState ¶
func (ct *CountTrigger[T]) ResetState() any
func (*CountTrigger[T]) ShouldFire ¶
func (ct *CountTrigger[T]) ShouldFire(element T, state any) bool
type EventTimeSessionState ¶
type EventTimeSessionState struct {
// contains filtered or unexported fields
}
EventTimeSessionState tracks the state of an event-time session window
func NewEventTimeSessionState ¶
func NewEventTimeSessionState(policy LateDataPolicy) *EventTimeSessionState
NewEventTimeSessionState creates a new session state
func (*EventTimeSessionState) AddElement ¶
func (ss *EventTimeSessionState) AddElement(element Record, eventTime time.Time)
AddElement adds an element to the session
func (*EventTimeSessionState) Fire ¶
func (ss *EventTimeSessionState) Fire() []Record
Fire triggers the session to emit results
func (*EventTimeSessionState) HasFired ¶
func (ss *EventTimeSessionState) HasFired() bool
HasFired returns true if the session has already fired
func (*EventTimeSessionState) ShouldFire ¶
ShouldFire determines if the session should fire based on timeout and watermark
type EventTimeWindowConfig ¶
type EventTimeWindowConfig struct {
TimestampExtractor RecordTimestampExtractor
WatermarkGenerator WatermarkGenerator
LateDataPolicy LateDataPolicy
AllowedLateness time.Duration
}
EventTimeWindowConfig holds configuration for event-time windows on Records
type EventTimeWindowOption ¶
type EventTimeWindowOption func(*EventTimeWindowConfig)
EventTimeWindowOption configures event-time windows
func WithAllowedLateness ¶
func WithAllowedLateness(lateness time.Duration) EventTimeWindowOption
WithAllowedLateness sets the maximum allowed lateness (convenience for watermark generation)
func WithLateDataPolicy ¶
func WithLateDataPolicy(policy LateDataPolicy) EventTimeWindowOption
WithLateDataPolicy sets how late data should be handled
func WithTimestampExtractor ¶
func WithTimestampExtractor(extractor RecordTimestampExtractor) EventTimeWindowOption
WithTimestampExtractor sets the timestamp extraction function
func WithWatermarkGenerator ¶
func WithWatermarkGenerator(generator WatermarkGenerator) EventTimeWindowOption
WithWatermarkGenerator sets the watermark generation strategy
type EventTimeWindowState ¶
type EventTimeWindowState struct {
// contains filtered or unexported fields
}
EventTimeWindowState tracks the state of an event-time window for Records
func NewEventTimeWindowState ¶
func NewEventTimeWindowState(start, end time.Time, policy LateDataPolicy) *EventTimeWindowState
NewEventTimeWindowState creates a new event-time window state for Records
func (*EventTimeWindowState) AddElement ¶
func (ws *EventTimeWindowState) AddElement(element Record, eventTime time.Time) bool
AddElement adds an element to the window if it belongs to this window
func (*EventTimeWindowState) Fire ¶
func (ws *EventTimeWindowState) Fire() []Record
Fire triggers the window to emit results
func (*EventTimeWindowState) HasFired ¶
func (ws *EventTimeWindowState) HasFired() bool
HasFired returns true if the window has already fired
func (*EventTimeWindowState) ShouldFire ¶
func (ws *EventTimeWindowState) ShouldFire(watermark time.Time) bool
ShouldFire determines if the window should fire based on watermark
type ExecutionContext ¶
type ExecutionContext struct {
Ctx context.Context
MaxMemory int64 // Available memory limit
MaxGoroutines int // CPU parallelism limit
GPUMemory int64 // Available GPU memory (0 if no GPU)
BatchSize int // Preferred batch size for vectorization
}
ExecutionContext provides runtime information for operation execution
type Executor ¶
type Executor interface {
// CanHandle returns true if this executor can efficiently handle the operation
CanHandle(op Operation, ctx ExecutionContext) bool
// GetScore returns a score (0-100) indicating how well this executor handles the operation
// Higher scores indicate better suitability
GetScore(op Operation, ctx ExecutionContext) int
// ExecuteMap performs map operation with this executor
ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}
// ExecuteFilter performs filter operation with this executor
ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}
// ExecuteReduce performs reduce operation with this executor
ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}
// ExecuteParallel performs parallel processing with this executor
ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}
// Name returns the executor name for debugging/telemetry
Name() string
}
Executor interface abstracts different execution backends (CPU, GPU, distributed)
type ExecutorManager ¶
type ExecutorManager struct {
// contains filtered or unexported fields
}
ExecutorManager coordinates multiple executors and selects the best one
func NewExecutorManager ¶
func NewExecutorManager() *ExecutorManager
NewExecutorManager creates a new executor manager with automatic backend detection
func (*ExecutorManager) AddExecutor ¶
func (em *ExecutorManager) AddExecutor(executor Executor)
AddExecutor adds an executor to the manager
func (*ExecutorManager) SelectBest ¶
func (em *ExecutorManager) SelectBest(op Operation, ctx ExecutionContext) Executor
SelectBest chooses the best executor for the given operation
type FastTSVSource ¶
type FastTSVSource struct {
Reader io.Reader
HasHeader bool
Separator string // Can be any string, typically "\t" for TSV
Headers []string
}
FastTSVSource configuration for reading delimited data with simple string splitting This is much faster than CSV parsing but doesn't support escaping or quoted fields
func NewFastTSVSource ¶
func NewFastTSVSource(reader io.Reader) *FastTSVSource
NewFastTSVSource creates a fast TSV source using simple string splitting
func NewFastTSVSourceWithSeparator ¶
func NewFastTSVSourceWithSeparator(reader io.Reader, separator string) *FastTSVSource
NewFastTSVSourceWithSeparator creates a fast delimited source with custom separator
func (*FastTSVSource) ToStream ¶
func (s *FastTSVSource) ToStream() Stream[Record]
ToStream converts the fast TSV source to a Record stream
func (*FastTSVSource) WithHeaders ¶
func (s *FastTSVSource) WithHeaders(headers []string) *FastTSVSource
WithHeaders sets custom headers for the fast TSV source
func (*FastTSVSource) WithoutHeader ¶
func (s *FastTSVSource) WithoutHeader() *FastTSVSource
WithoutHeader indicates the data has no header row
type Filter ¶
Filter transforms one stream into another with full type flexibility
func CountWindow ¶
CountWindow groups elements into batches of N elements. Each batch is emitted as a finite stream, enabling aggregations on infinite streams. Perfect for processing infinite streams in manageable chunks.
func CrossFlatten ¶
CrossFlatten expands stream fields using cross product (cartesian product) expansion. Creates multiple output records from each input record containing stream fields. Example: {"id": 1, "tags": Stream["a", "b"]} → [{"id": 1, "tags": "a"}, {"id": 1, "tags": "b"}]
func DotFlatten ¶
DotFlatten flattens nested records using dot product flattening (single output per input). Nested records become prefixed fields: {"user": {"name": "Alice"}} → {"user.name": "Alice"} Stream fields are expanded using dot product (linear, one-to-one mapping). When streams have different lengths, uses minimum length and discards excess elements. Example with streams: {"id": 1, "tags": Stream["a", "b"], "scores": Stream[10, 20]} →
[{"id": 1, "tags": "a", "scores": 10}, {"id": 1, "tags": "b", "scores": 20}]
Example with different lengths: {"short": Stream["a", "b"], "long": Stream[1, 2, 3, 4]} →
[{"short": "a", "long": 1}, {"short": "b", "long": 2}] (elements 3, 4 discarded)
func ExtractField ¶
ExtractField gets a typed field from records
func FullJoin ¶
func FullJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
FullJoin performs a full outer join between left stream and right stream. All records from both streams are returned, with matching when available. WARNING: Right stream is collected into memory - must be finite and reasonably sized.
func GroupBy ¶
GroupBy groups records by the specified fields and applies aggregations.
Returns one record per group containing:
- The key fields from the group
- Any additional aggregations specified
Example:
grouped := stream.GroupBy([]string{"department"})(users)
// Each result contains: department
groupedWithStats := stream.GroupBy([]string{"department"},
stream.AvgField[float64]("avg_salary", "salary"),
stream.CountField("count", "name"))(users)
// Each result contains: department, avg_salary, count
GroupBy groups records and applies custom aggregations to each group
func InnerJoin ¶
func InnerJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
InnerJoin performs an inner join between left stream and right stream. Only records with matching keys in both streams are returned. WARNING: Right stream is collected into memory - must be finite and reasonably sized.
func LeftJoin ¶
func LeftJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
LeftJoin performs a left join between left stream and right stream. All records from left stream are returned, with matching right records when available. WARNING: Right stream is collected into memory - must be finite and reasonably sized.
func Map ¶
Map transforms each element in a stream with automatic parallelization for large datasets
func RightJoin ¶
func RightJoin(rightStream Stream[Record], leftKey, rightKey string, options ...JoinOption) Filter[Record, Record]
RightJoin performs a right join between left stream and right stream. All records from right stream are returned, with matching left records when available. WARNING: Right stream is collected into memory - must be finite and reasonably sized.
func SessionWindow ¶
func SessionWindow[T any](timeout time.Duration, activityDetector ActivityDetector[T]) Filter[T, Stream[T]]
SessionWindow creates activity-based windows that group related events Windows extend as long as activity is detected within the timeout period
func SlidingCountWindow ¶
SlidingCountWindow creates overlapping windows of size windowSize with step stepSize. Each window slides by stepSize elements, creating overlapping batches.
func Sort ¶
Sort sorts elements using a custom comparison function For finite streams only - infinite streams require windowing
func SortAsc ¶
func SortAsc[T Comparable]() Filter[T, T]
SortAsc sorts elements in ascending order using Comparable constraint
func SortByDesc ¶
SortByDesc sorts Records by specified fields in descending order
func SortCountWindow ¶
SortCountWindow sorts elements within count-based windows
func SortDesc ¶
func SortDesc[T Comparable]() Filter[T, T]
SortDesc sorts elements in descending order using Comparable constraint
func SortTimeWindow ¶
SortTimeWindow sorts elements within time-based windows
func Split ¶
Split splits a stream of records into substreams based on key fields. Each substream contains all records that share the same key values. Works with both finite and infinite streams using a central dispatcher.
func StreamingAvg ¶
StreamingAvg emits running average as each element arrives.
func StreamingCount ¶
StreamingCount emits running count as each element arrives.
func StreamingGroupBy ¶
StreamingGroupBy maintains running group statistics and emits updates. Unlike regular GroupBy, this works with infinite streams by emitting updated group totals as new records arrive.
func StreamingMax ¶
func StreamingMax[T Comparable]() Filter[T, T]
StreamingMax emits running maximum as each element arrives.
func StreamingMin ¶
func StreamingMin[T Comparable]() Filter[T, T]
StreamingMin emits running minimum as each element arrives.
func StreamingStats ¶
StreamingStats emits comprehensive running statistics for each element. Returns Record with count, sum, avg, min, max.
func StreamingSum ¶
StreamingSum emits running sum continuously as each element arrives. Perfect for real-time dashboards and monitoring.
func TimeWindow ¶
TimeWindow groups elements into time-based windows. Collects elements for the specified duration, then emits as a finite stream.
func TopK ¶
TopK maintains the top K elements using a heap-based approach Suitable for infinite streams - provides approximate sorting
func TriggeredWindow ¶
TriggeredWindow creates windows based on trigger conditions
type FloatStream ¶
type GPUExecutor ¶
type GPUExecutor struct {
// contains filtered or unexported fields
}
GPUExecutor implements stream operations using NVIDIA CUDA acceleration This is a stub implementation - actual CUDA integration will be added when hardware is available
func NewGPUExecutor ¶
func NewGPUExecutor() *GPUExecutor
NewGPUExecutor creates a new GPU executor if CUDA hardware is available Returns nil if no CUDA-capable GPU is detected
func (*GPUExecutor) CanHandle ¶
func (e *GPUExecutor) CanHandle(op Operation, ctx ExecutionContext) bool
CanHandle returns true if GPU can handle the operation efficiently
func (*GPUExecutor) ExecuteFilter ¶
func (e *GPUExecutor) ExecuteFilter(predicate interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteFilter performs filter operation using GPU
func (*GPUExecutor) ExecuteMap ¶
func (e *GPUExecutor) ExecuteMap(fn interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteMap performs map operation using GPU
func (*GPUExecutor) ExecuteParallel ¶
func (e *GPUExecutor) ExecuteParallel(workers int, fn interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteParallel performs parallel processing using GPU
func (*GPUExecutor) ExecuteReduce ¶
func (e *GPUExecutor) ExecuteReduce(fn interface{}, initial interface{}, input interface{}, ctx ExecutionContext) interface{}
ExecuteReduce performs reduce operation using GPU
func (*GPUExecutor) GetScore ¶
func (e *GPUExecutor) GetScore(op Operation, ctx ExecutionContext) int
GetScore returns a score for how well GPU handles the operation
type JSONFormat ¶
type JSONFormat int
JSONFormat specifies how JSON data is structured
const ( JSONLines JSONFormat = iota // Each line is a separate JSON object JSONArray // Single JSON array containing objects )
type JSONSink ¶
type JSONSink struct {
Writer io.Writer
Format JSONFormat
Pretty bool
}
JSONSink configuration for writing JSON data
func NewJSONSink ¶
NewJSONSink creates a JSON sink to a writer (defaults to JSON Lines)
func NewJSONSinkToFile ¶
func (*JSONSink) WithFormat ¶
func (sink *JSONSink) WithFormat(format JSONFormat) *JSONSink
WithFormat sets the JSON format
func (*JSONSink) WithPrettyPrint ¶
WithPrettyPrint enables pretty printing
func (*JSONSink) WriteRecords ¶
WriteRecords writes a slice of Records to JSON format
type JSONSource ¶
type JSONSource struct {
Reader io.Reader
Format JSONFormat
}
JSONSource configuration for reading JSON data
func NewJSONSource ¶
func NewJSONSource(reader io.Reader) *JSONSource
NewJSONSource creates a JSON source from a reader (defaults to JSON Lines)
func NewJSONSourceFromFile ¶
func NewJSONSourceFromFile(filename string) (*JSONSource, error)
File-based JSON functions
func (*JSONSource) ToStream ¶
func (js *JSONSource) ToStream() Stream[Record]
ToStream converts JSON data to a Record stream
func (*JSONSource) WithFormat ¶
func (js *JSONSource) WithFormat(format JSONFormat) *JSONSource
WithFormat sets the JSON format
type JoinOption ¶
type JoinOption func(*joinConfig)
JoinOption configures join behavior
func WithPrefixes ¶
func WithPrefixes(leftPrefix, rightPrefix string) JoinOption
WithPrefixes sets custom prefixes for field name conflicts Default is "left." and "right."
type LateDataPolicy ¶
type LateDataPolicy int
LateDataPolicy defines how to handle data that arrives after window firing
const ( DropLateData LateDataPolicy = iota // Ignore late-arriving data UpdateWindow // Update window results with late data SideOutputLate // Send late data to separate stream )
type Numeric ¶
type Numeric interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
~float32 | ~float64
}
Numeric constraint for mathematical operations
type Operation ¶
type Operation struct {
Type OpType
DataType reflect.Type
InputSize int64 // Estimated size (-1 if unknown/infinite)
IsVectorizable bool // Can benefit from SIMD/GPU vectorization
MemoryUsage int64 // Estimated memory usage in bytes
Complexity int // Computational complexity (1=simple, 10=heavy)
}
Operation describes a stream operation with metadata for executor selection
func NewFilterOperation ¶
NewFilterOperation creates metadata for a Filter operation
func NewMapOperation ¶
NewMapOperation creates metadata for a Map operation
type ProtobufFormat ¶
type ProtobufFormat int
ProtobufFormat specifies how protobuf data is structured
const ( ProtobufDelimited ProtobufFormat = iota // Length-delimited messages (streaming) ProtobufJSON // JSON representation of protobuf )
type ProtobufSink ¶
type ProtobufSink struct {
Writer io.Writer
MessageDesc protoreflect.MessageDescriptor
Format ProtobufFormat
}
ProtobufSink configuration for writing protobuf data
func NewProtobufSink ¶
func NewProtobufSink(writer io.Writer, messageDesc protoreflect.MessageDescriptor) *ProtobufSink
NewProtobufSink creates a protobuf sink to a writer
func NewProtobufSinkToFile ¶
func NewProtobufSinkToFile(filename string, messageDesc protoreflect.MessageDescriptor) (*ProtobufSink, error)
func (*ProtobufSink) WithFormat ¶
func (sink *ProtobufSink) WithFormat(format ProtobufFormat) *ProtobufSink
WithFormat sets the protobuf format
func (*ProtobufSink) WriteRecords ¶
func (sink *ProtobufSink) WriteRecords(records []Record) error
WriteRecords writes a slice of Records to protobuf format
func (*ProtobufSink) WriteStream ¶
func (sink *ProtobufSink) WriteStream(stream Stream[Record]) error
WriteStream writes a Record stream to protobuf format
type ProtobufSource ¶
type ProtobufSource struct {
Reader io.Reader
MessageDesc protoreflect.MessageDescriptor
Format ProtobufFormat
}
ProtobufSource configuration for reading Protocol Buffer data
func NewProtobufSource ¶
func NewProtobufSource(reader io.Reader, messageDesc protoreflect.MessageDescriptor) *ProtobufSource
NewProtobufSource creates a protobuf source from a reader with a message descriptor
func NewProtobufSourceFromFile ¶
func NewProtobufSourceFromFile(filename string, messageDesc protoreflect.MessageDescriptor) (*ProtobufSource, error)
File-based protobuf functions
func (*ProtobufSource) ToStream ¶
func (ps *ProtobufSource) ToStream() Stream[Record]
ToStream converts protobuf data to a Record stream
func (*ProtobufSource) WithFormat ¶
func (ps *ProtobufSource) WithFormat(format ProtobufFormat) *ProtobufSource
WithFormat sets the protobuf format
type Record ¶
Record represents structured data with native Go types
func Aggregates ¶
func Aggregates[T any](stream Stream[T], specs ...AggregatorSpec[T]) (Record, error)
Aggregates runs multiple named aggregators and returns results in a Record
type RecordTimestampExtractor ¶
RecordTimestampExtractor extracts event time from Record elements
func NewRecordTimestampExtractor ¶
func NewRecordTimestampExtractor(fieldName string) RecordTimestampExtractor
NewRecordTimestampExtractor creates a timestamp extractor for Record types that returns standardized UTC timestamps for consistent time comparison
type SessionState ¶
type SessionState[T any] struct { ID string StartTime time.Time LastActivity time.Time Elements []T }
SessionState holds the state of a session window
type Stream ¶
Stream represents a generic data stream - the heart of V2
func CSVToStream ¶
CSVToStream reads CSV from a reader and returns a Record stream
func CSVToStreamFromFile ¶
File-based convenience functions for backward compatibility
func FastDelimitedToStreamFromFile ¶
FastDelimitedToStreamFromFile reads delimited file with custom separator
func FastTSVToStream ¶
FastTSVToStream reads delimited data using fast string splitting
func FastTSVToStreamFromFile ¶
FastTSVToStreamFromFile reads TSV file using fast string splitting
func FastTSVToStreamWithSeparator ¶
FastTSVToStreamWithSeparator reads delimited data with custom separator
func FromChannel ¶
FromChannel creates a type-safe stream from a channel (Safe by Default) Only accepts types that are compatible with Record Value system
func FromChannelAny ¶
FromChannelAny creates a stream from any type channel - USE WITH CAUTION This bypasses Value type safety - ensure types are compatible with your use case
func FromMapsUnsafe ¶
FromMapsUnsafe creates a Record stream from maps without validation - USE WITH CAUTION
func FromRecords ¶
FromRecords creates a Record stream with field type validation
func FromRecordsUnsafe ¶
FromRecordsUnsafe creates a Record stream without validation - USE WITH CAUTION
func FromSlice ¶
FromSlice creates a type-safe stream from a slice (Safe by Default) Only accepts types that are compatible with Record Value system
func FromSliceAny ¶
FromSliceAny creates a stream from any type slice - USE WITH CAUTION This bypasses Value type safety - ensure types are compatible with your use case
func GenerateAny ¶
GenerateAny creates a stream using any type generator - USE WITH CAUTION
func JSONToStream ¶
JSONToStream reads JSON from a reader and returns a Record stream (defaults to JSON Lines)
func LegacyTSVToStream ¶
LegacyTSVToStream reads TSV using CSV parser (for backward compatibility)
func ProtobufToStream ¶
func ProtobufToStream(reader io.Reader, messageDesc protoreflect.MessageDescriptor) Stream[Record]
ProtobufToStream reads protobuf from a reader and returns a Record stream
func ProtobufToStreamFromFile ¶
func ProtobufToStreamFromFile(filename string, messageDesc protoreflect.MessageDescriptor) (Stream[Record], error)
func TSVToStream ¶
TSVToStream reads TSV from a reader and returns a Record stream Now uses fast string splitting for better performance
type StreamingCSVWriter ¶
type StreamingCSVWriter struct {
// contains filtered or unexported fields
}
StreamingCSVWriter allows writing Records to CSV as they arrive (for infinite streams)
func NewStreamingCSVWriter ¶
func NewStreamingCSVWriter(w io.Writer, headers []string) *StreamingCSVWriter
NewStreamingCSVWriter creates a streaming CSV writer
func (*StreamingCSVWriter) Close ¶
func (scw *StreamingCSVWriter) Close() error
Close flushes and closes the writer
func (*StreamingCSVWriter) WriteRecord ¶
func (scw *StreamingCSVWriter) WriteRecord(record Record) error
WriteRecord writes a single record to the CSV stream
type StringStream ¶
type TimestampedRecord ¶
TimestampedRecord pairs a Record with its event timestamp
type TypedRecord ¶
type TypedRecord struct {
// contains filtered or unexported fields
}
TypedRecord provides type-safe field setting with method chaining
func (*TypedRecord) Bool ¶
func (tr *TypedRecord) Bool(key string, value bool) *TypedRecord
func (*TypedRecord) Build ¶
func (tr *TypedRecord) Build() Record
Build returns the completed Record
func (*TypedRecord) Float ¶
func (tr *TypedRecord) Float(key string, value float64) *TypedRecord
func (*TypedRecord) Int ¶
func (tr *TypedRecord) Int(key string, value int64) *TypedRecord
func (*TypedRecord) Record ¶
func (tr *TypedRecord) Record(key string, value Record) *TypedRecord
func (*TypedRecord) Set ¶
func (tr *TypedRecord) Set(key string, value any) *TypedRecord
Set adds a field with compile-time type safety
func (*TypedRecord) String ¶
func (tr *TypedRecord) String(key string, value string) *TypedRecord
Convenience methods for common types with type safety
func (*TypedRecord) Time ¶
func (tr *TypedRecord) Time(key string, value time.Time) *TypedRecord
type Value ¶
type Value interface {
~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
~float32 | ~float64 |
~bool | ~string | time.Time |
Record |
Stream[int] | Stream[int8] | Stream[int16] | Stream[int32] | Stream[int64] |
Stream[uint] | Stream[uint8] | Stream[uint16] | Stream[uint32] | Stream[uint64] |
Stream[float32] | Stream[float64] |
Stream[bool] | Stream[string] | Stream[time.Time] |
Stream[Record]
}
Value defines the compile-time type constraint for Record field values This replaces runtime reflection with compile-time type safety
type ValueChangeTrigger ¶
type ValueChangeTrigger[T any] struct { ExtractFunc func(T) any // contains filtered or unexported fields }
ValueChangeTrigger fires when a specific field value changes
func NewValueChangeTrigger ¶
func NewValueChangeTrigger[T any](extractFunc func(T) any) *ValueChangeTrigger[T]
func (*ValueChangeTrigger[T]) ResetState ¶
func (vct *ValueChangeTrigger[T]) ResetState() any
func (*ValueChangeTrigger[T]) ShouldFire ¶
func (vct *ValueChangeTrigger[T]) ShouldFire(element T, state any) bool
type WatermarkGenerator ¶
WatermarkGenerator generates watermarks based on observed event times
func BoundedOutOfOrdernessWatermark ¶
func BoundedOutOfOrdernessWatermark(maxLateness time.Duration) WatermarkGenerator
BoundedOutOfOrdernessWatermark generates watermarks allowing specified lateness
func PeriodicWatermarkGenerator ¶
func PeriodicWatermarkGenerator(interval time.Duration, baseGenerator WatermarkGenerator) WatermarkGenerator
PeriodicWatermarkGenerator creates a watermark generator that updates periodically
type WatermarkTracker ¶
type WatermarkTracker struct {
// contains filtered or unexported fields
}
WatermarkTracker manages watermark progression for event-time processing
func NewWatermarkTracker ¶
func NewWatermarkTracker(generator WatermarkGenerator) *WatermarkTracker
NewWatermarkTracker creates a new watermark tracker
func (*WatermarkTracker) GetWatermark ¶
func (wt *WatermarkTracker) GetWatermark() time.Time
GetWatermark returns the current watermark
func (*WatermarkTracker) UpdateWatermark ¶
func (wt *WatermarkTracker) UpdateWatermark(eventTime time.Time) time.Time
UpdateWatermark processes a new event time and potentially advances the watermark
type WindowBuilder ¶
type WindowBuilder[T any] struct { // contains filtered or unexported fields }
WindowBuilder provides a fluent API for configuring advanced windows
func Window ¶
func Window[T any]() *WindowBuilder[T]
Window creates a new window builder for advanced windowing configuration
func (*WindowBuilder[T]) AccumulationMode ¶
func (wb *WindowBuilder[T]) AccumulationMode() *WindowBuilder[T]
AccumulationMode sets the accumulation mode for late data
func (*WindowBuilder[T]) AllowLateness ¶
func (wb *WindowBuilder[T]) AllowLateness(lateness time.Duration) *WindowBuilder[T]
AllowLateness configures how late data should be handled
func (*WindowBuilder[T]) Apply ¶
func (wb *WindowBuilder[T]) Apply() Filter[T, Stream[T]]
Apply creates the windowing filter with the configured settings
func (*WindowBuilder[T]) DiscardingMode ¶
func (wb *WindowBuilder[T]) DiscardingMode() *WindowBuilder[T]
DiscardingMode sets the discarding mode for late data (default)
func (*WindowBuilder[T]) TriggerOn ¶
func (wb *WindowBuilder[T]) TriggerOn(trigger AdvancedTrigger[T]) *WindowBuilder[T]
TriggerOn adds a custom trigger to the window
func (*WindowBuilder[T]) TriggerOnCount ¶
func (wb *WindowBuilder[T]) TriggerOnCount(count int) *WindowBuilder[T]
TriggerOnCount adds a count-based trigger to the window
func (*WindowBuilder[T]) TriggerOnProcessingTime ¶
func (wb *WindowBuilder[T]) TriggerOnProcessingTime(interval time.Duration) *WindowBuilder[T]
TriggerOnProcessingTime adds a processing time trigger
func (*WindowBuilder[T]) TriggerOnTime ¶
func (wb *WindowBuilder[T]) TriggerOnTime(duration time.Duration) *WindowBuilder[T]
TriggerOnTime adds a time-based trigger to the window