Documentation
¶
Index ¶
- func IsExpandError(err error) bool
- func IsFilterError(err error) bool
- func IsKeyByError(err error) bool
- func IsMapError(err error) bool
- func IsRunError(err error) bool
- func IsSinkError(err error) bool
- func IsWindowError(err error) bool
- func JumpHash[K comparable](k K, shardCount int) uint64
- func ModulusHash[K comparable](k K, shardCount int) uint64
- type Closeable
- type DataStream
- func Expand[T any, U any](ds DataStream[T], expandFunc ExpandFunc[T, U], params ...Params) DataStream[U]
- func Map[T any, U any](ds DataStream[T], transformFunc TransformFunc[T, U], params ...Params) DataStream[U]
- func New[T any](ctx context.Context, inStream <-chan T, errStream chan<- error) DataStream[T]
- func Window[T any, K comparable, R any](keyedDs KeyedDataStream[T, K], wf WindowFunc[T, R], ...) DataStream[R]
- func (p DataStream[T]) Broadcast(params ...Params) DataStream[T]
- func (p DataStream[T]) FanIn(params ...Params) DataStream[T]
- func (p DataStream[T]) FanOut(params ...Params) DataStream[T]
- func (p DataStream[T]) Filter(filter FilterFunc[T], params ...Params) DataStream[T]
- func (p DataStream[T]) Listen(listenerIndex int, params ...Params) DataStream[T]
- func (p DataStream[T]) OrDone(params ...Params) DataStream[T]
- func (p DataStream[T]) Out() <-chan T
- func (p DataStream[T]) Run(proc ProcessFunc[T], params ...Params) DataStream[T]
- func (p DataStream[T]) Sink(sinker Sinker[T], params ...Params) DataStream[T]
- func (p DataStream[T]) Take(params ...Params) DataStream[T]
- func (p DataStream[T]) Tee(params ...Params) (DataStream[T], DataStream[T])
- func (p DataStream[T]) WithWaitGroup(wg *sync.WaitGroup) DataStream[T]
- type Error
- type ErrorCode
- type ExpandFunc
- type FilterFunc
- type KeyFunc
- type Keyable
- type KeyableElement
- type KeyedDataStream
- type Params
- type Partition
- type Partitioner
- type ProcessFunc
- type ShardKeyFunc
- type ShardedStore
- type ShardedStoreOpts
- type Sinker
- type Sourcer
- type TimeMarker
- type TimedKeyableElement
- type TransformFunc
- type WatermarkGenerator
- type WindowFunc
- type WindowedPartitioner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsExpandError ¶ added in v0.0.7
IsExpandError checks if the given error is an EXPAND error. It returns true if the error is an EXPAND error, otherwise false.
func IsFilterError ¶
IsFilterError checks if the given error is a FILTER error. It returns true if the error is a FILTER error, otherwise false.
func IsKeyByError ¶ added in v0.0.8
IsKeyByError checks if the given error is a KEY_BY error. It returns true if the error is a KEY_BY error, otherwise false.
func IsMapError ¶
IsMapError checks if the given error is a MAP error. It returns true if the error is a MAP error, otherwise false.
func IsRunError ¶
IsRunError checks if the given error is a RUN error. It returns true if the error is a RUN error, otherwise false.
func IsSinkError ¶
func IsWindowError ¶ added in v0.0.8
IsWindowError checks if the given error is a WINDOW error. It returns true if the error is a WINDOW error, otherwise false.
func JumpHash ¶ added in v0.0.10
func JumpHash[K comparable](k K, shardCount int) uint64
JumpHash implements Jump Consistent Hash, a fast, minimalist algorithm from a 2014 Google paper by Lamping and Ringenburg.
Its primary advantage is that it requires the minimum number of keys to be remapped when the number of shards (buckets) changes, unlike modulus hashing.
It works by converting the key into a 64-bit integer which is used as a random seed. It then deterministically "jumps" forward through bucket indices until it lands on the one designated for the key within the given shardCount.
func ModulusHash ¶ added in v0.0.10
func ModulusHash[K comparable](k K, shardCount int) uint64
ModulusHash implements a simple sharding strategy using the modulo operator (%).
It works by converting the key into a 64-bit hash and then finding the remainder when that hash is divided by the total number of shards.
While very fast, its major drawback is that changing the shard count causes most keys to be remapped to new shards. It is best used when the number of shards is fixed and will not change.
Types ¶
type Closeable ¶ added in v0.0.10
type Closeable interface {
// Close signals the component to flush any remaining data and shut down.
// It should be called when the component is no longer needed.
Close()
}
type DataStream ¶
type DataStream[T any] struct { // contains filtered or unexported fields }
DataStream is a struct that defines a generic stream process stage. It manages one or more input channels (inStreams) and a shared error stream.
func Expand ¶ added in v0.0.7
func Expand[T any, U any]( ds DataStream[T], expandFunc ExpandFunc[T, U], params ...Params, ) DataStream[U]
func Map ¶
func Map[T any, U any]( ds DataStream[T], transformFunc TransformFunc[T, U], params ...Params, ) DataStream[U]
Map is a package-level function that transforms each item from T to U using a TransformFunc.
func New ¶
func New[T any]( ctx context.Context, inStream <-chan T, errStream chan<- error, ) DataStream[T]
New constructs a new DataStream of a given type by passing in a context, an input channel, and an error channel. Additional channels can be introduced internally via transformations like FanOut.
func Window ¶ added in v0.0.8
func Window[T any, K comparable, R any]( keyedDs KeyedDataStream[T, K], wf WindowFunc[T, R], partitioner Partitioner[T, K], param ...Params, ) DataStream[R]
Window applies a windowing function to a KeyedDataStream, producing a new KeyedDataStream with re-partitioned output. It uses the provided Partitioner[T], KeyFunc, and optional Params to define the windowing behavior.
func (DataStream[T]) Broadcast ¶
func (p DataStream[T]) Broadcast( params ...Params, ) DataStream[T]
Broadcast sends each item to param.Num new channels, effectively duplicating every item across all output channels.
func (DataStream[T]) FanIn ¶
func (p DataStream[T]) FanIn( params ...Params, ) DataStream[T]
FanIn merges a slice of input channels into a single output channel.
func (DataStream[T]) FanOut ¶
func (p DataStream[T]) FanOut( params ...Params, ) DataStream[T]
FanOut duplicates the number of output channels by param.Num, distributing incoming items in a round-robin manner across all new channels.
func (DataStream[T]) Filter ¶
func (p DataStream[T]) Filter(filter FilterFunc[T], params ...Params) DataStream[T]
Filter applies a user defined function to each value in the input stream(s) and only returns values that pass the filter check (true). If an error occurs, the item is dropped.
func (DataStream[T]) Listen ¶ added in v0.0.9
func (p DataStream[T]) Listen( listenerIndex int, params ...Params, ) DataStream[T]
Listen allows you to subscribe to a DataStream, effectively creating a new DataStream that listens to the same input channel. This allows you to broadcast the same input stream to multiple listeners.
func (DataStream[T]) OrDone ¶
func (p DataStream[T]) OrDone( params ...Params, ) DataStream[T]
OrDone terminates if the input stream is closed or context is done, effectively passing items through until the upstream channel signals completion.
func (DataStream[T]) Out ¶
func (p DataStream[T]) Out() <-chan T
Out returns the single output channel of this DataStream. If the DataStream has multiple input channels, it automatically FanIns them into a single output.
func (DataStream[T]) Run ¶
func (p DataStream[T]) Run( proc ProcessFunc[T], params ...Params, ) DataStream[T]
Run executes a user defined process function on the input stream(s). Each input channel is handled in its own goroutine, writing processed results to a newly created set of output channels. Errors can be skipped if SkipError is set.
func (DataStream[T]) Sink ¶
func (p DataStream[T]) Sink( sinker Sinker[T], params ...Params, ) DataStream[T]
Sink outputs DataStream values to a defined Sinker in a separate goroutine. This allows the pipeline to continue processing asynchronously.
func (DataStream[T]) Take ¶
func (p DataStream[T]) Take( params ...Params, ) DataStream[T]
Take returns only the first N items from the input streams. If multiple input streams exist, each is read up to N items, meaning total items could be N * numberOfStreams.
func (DataStream[T]) Tee ¶
func (p DataStream[T]) Tee( params ...Params, ) (DataStream[T], DataStream[T])
Tee splits values coming in from a single channel so that you can send them off into two separate DataStream outputs.
func (DataStream[T]) WithWaitGroup ¶
func (p DataStream[T]) WithWaitGroup(wg *sync.WaitGroup) DataStream[T]
WithWaitGroup attaches a WaitGroup to this DataStream, returning a copy.
type ErrorCode ¶
type ErrorCode int
ErrorCode represents a generic enrich ErrorCode
type ExpandFunc ¶ added in v0.0.7
ExpandFunc is a user defined function type used in a given DataStream stage This function type is used to expand a given input into multiple outputs
type FilterFunc ¶
FilterFunc is a user defined function type used in a given DataStream stage This function type is used to filter a given input type
type KeyFunc ¶ added in v0.0.8
type KeyFunc[T any, K comparable] func(T) K
KeyFunc is a user defined function that takes a value of type T and returns a key of type K for the given value. This function is used to partition pipeline data streams by key.
type Keyable ¶ added in v0.0.10
type Keyable[K comparable] interface { Key() K }
type KeyableElement ¶ added in v0.0.10
type KeyableElement[T any, K comparable] interface { Keyable[K] Value() T }
func NewKeyedElement ¶ added in v0.0.10
func NewKeyedElement[T any, K comparable](key K, value T) KeyableElement[T, K]
NewKeyedElement creates a new KeyableElement with the given key and value.
type KeyedDataStream ¶ added in v0.0.8
type KeyedDataStream[T any, K comparable] struct { DataStream[KeyableElement[T, K]] // contains filtered or unexported fields }
KeyedDataStream represents a stream of data elements partitioned by a key of type K, derived using a key function.
func KeyBy ¶ added in v0.0.8
func KeyBy[T any, K comparable]( ds DataStream[T], keyFunc KeyFunc[T, K], params ...Params, ) KeyedDataStream[T, K]
KeyBy a DataStream into multiple channels (one channel per computed key) so that all items with the same key travel through the same DataStream[T].
func (KeyedDataStream[T, K]) WithTimeMarker ¶ added in v0.0.8
func (k KeyedDataStream[T, K]) WithTimeMarker( timeMarker TimeMarker, ) KeyedDataStream[T, K]
WithTimeMarker attaches a time marker to the KeyedDataStream.
func (KeyedDataStream[T, K]) WithWatermarkGenerator ¶ added in v0.0.8
func (k KeyedDataStream[T, K]) WithWatermarkGenerator( waterMarker WatermarkGenerator[T], ) KeyedDataStream[T, K]
WithWatermarkGenerator attaches a watermark generator to the KeyedDataStream.
type Partition ¶ added in v0.0.10
type Partition[T any, K comparable] interface { // Push sends a new element into the partition for processing. Push(item TimedKeyableElement[T, K]) }
Partition represents a single, active partition for a specific key. It is an active component responsible for its own logic.
type Partitioner ¶ added in v0.0.10
type Partitioner[T any, K comparable] interface { // Create initializes and starts a new, active Partition instance. // It takes a context for cancellation and an output channel where it will // send its results (e.g., slices of T for windows). Create(ctx context.Context, out chan<- []T) Partition[T, K] Closeable }
Partitioner TODO: Decouple Partitioning from Windowing. ref github issue #61 Partitioner acts as a template or factory for creating new partitions. A single Partitioner (e.g., a configured Sliding window) is used to create all active partition instances for different keys.
type ProcessFunc ¶
ProcessFunc is a user defined function type used in a given DataStream stage
type ShardKeyFunc ¶ added in v0.0.10
type ShardKeyFunc[K comparable] func(k K, shardCount int) uint64
ShardKeyFunc defines the function signature for a sharding algorithm. It takes a key and the total number of shards and must return a deterministic shard index (typically in the range [0, shardCount-1]). Implementations can include modulus hashing, jump hashing, or other consistent hashing algorithms.
type ShardedStore ¶ added in v0.0.10
type ShardedStore[T any, K comparable] struct { *ShardedStoreOpts[K] // contains filtered or unexported fields }
ShardedStore is a thread-safe, in-memory generic key-value store. It distributes data across a fixed number of internal shards to reduce lock contention and improve concurrency in high-throughput environments. The sharding strategy is determined by the provided ShardKeyFunc.
func NewShardedPartitionStore ¶ added in v0.0.10
func NewShardedPartitionStore[T any, K comparable]( opts *ShardedStoreOpts[K], ) *ShardedStore[T, K]
NewShardedPartitionStore creates and initializes a new ShardedStore with the provided options. If the ShardCount in the options is zero, it defaults to 1 to ensure a functional store.
func (*ShardedStore[T, K]) Close ¶ added in v0.0.10
func (s *ShardedStore[T, K]) Close()
Close clears all entries in the ShardedStore. It iterates through all shards and deletes each key-value pair.
func (*ShardedStore[T, K]) Get ¶ added in v0.0.10
func (s *ShardedStore[T, K]) Get(k K) (Partition[T, K], bool)
Get retrieves a Partition[T] by its key K from the store. It returns the Partition and a boolean indicating whether the key was found. If the key does not exist, it returns a zero value of Partition[T] and false
func (*ShardedStore[T, K]) Initialize ¶ added in v0.0.10
func (s *ShardedStore[T, K]) Initialize()
func (*ShardedStore[T, K]) Keys ¶ added in v0.0.10
func (s *ShardedStore[T, K]) Keys() []K
Keys returns a slice of all keys currently stored in the ShardedStore. It iterates through all shards and collects the keys from each shard's sync.Map.
func (*ShardedStore[T, K]) Set ¶ added in v0.0.10
func (s *ShardedStore[T, K]) Set(k K, v Partition[T, K]) (shardIndex int)
Set stores a Partition[T] under the key K in the store. It returns the index of the shard where the value was stored. The shard index is determined by the ShardKeyFunc provided in the store's options. If the key already exists, it updates the value in the corresponding shard.
type ShardedStoreOpts ¶ added in v0.0.10
type ShardedStoreOpts[K comparable] struct { // ShardCount is the total number of concurrent shards to create. // Must be greater than zero. ShardCount int // ShardKeyFunc is the hashing function used to map a key to a shard index. ShardKeyFunc ShardKeyFunc[K] }
ShardedStoreOpts provides the configuration for creating a new ShardedStore.
type Sinker ¶
type Sinker[T any] interface { Sink(ctx context.Context, ds DataStream[T]) error }
Sinker is an interface that defines the Sink method
type Sourcer ¶
type Sourcer[T any] interface { Source(ctx context.Context, errSender chan<- error) DataStream[T] }
Sourcer is an interface that defines the Source method
type TimeMarker ¶ added in v0.0.10
type TimedKeyableElement ¶ added in v0.0.10
type TimedKeyableElement[T any, K comparable] interface { KeyableElement[T, K] Time() time.Time }
func NewTimedKeyedElement ¶ added in v0.0.10
func NewTimedKeyedElement[T any, K comparable](v KeyableElement[T, K], t time.Time) TimedKeyableElement[T, K]
NewTimedKeyedElement creates a new TimedKeyableElement with the given key, value, and time.
type TransformFunc ¶
TransformFunc is a user defined function type used in a given DataStream stage This function type is used to transform a given input type to a given output type
type WatermarkGenerator ¶ added in v0.0.10
type WatermarkGenerator[T any] interface { // OnEvent is called whenever a new event arrives, allowing the generator to update its state. // Event: the event that arrived // EventTime: the time of the event OnEvent(event T, eventTime time.Time) // GetWatermark returns the current watermark. GetWatermark() time.Time }
type WindowFunc ¶ added in v0.0.8
WindowFunc processes a given batch of data and returns a result. You can use this function in conjunction with the KeyedDataStream to perform windowed operations on batches of data with a given key.