datastreams

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IsExpandError added in v0.0.7

func IsExpandError(err error) bool

IsExpandError checks if the given error is an EXPAND error. It returns true if the error is an EXPAND error, otherwise false.

func IsFilterError

func IsFilterError(err error) bool

IsFilterError checks if the given error is a FILTER error. It returns true if the error is a FILTER error, otherwise false.

func IsKeyByError added in v0.0.8

func IsKeyByError(err error) bool

IsKeyByError checks if the given error is a KEY_BY error. It returns true if the error is a KEY_BY error, otherwise false.

func IsMapError

func IsMapError(err error) bool

IsMapError checks if the given error is a MAP error. It returns true if the error is a MAP error, otherwise false.

func IsRunError

func IsRunError(err error) bool

IsRunError checks if the given error is a RUN error. It returns true if the error is a RUN error, otherwise false.

func IsSinkError

func IsSinkError(err error) bool

func IsWindowError added in v0.0.8

func IsWindowError(err error) bool

IsWindowError checks if the given error is a WINDOW error. It returns true if the error is a WINDOW error, otherwise false.

func JumpHash added in v0.0.10

func JumpHash[K comparable](k K, shardCount int) uint64

JumpHash implements Jump Consistent Hash, a fast, minimalist algorithm from a 2014 Google paper by Lamping and Ringenburg.

Its primary advantage is that it requires the minimum number of keys to be remapped when the number of shards (buckets) changes, unlike modulus hashing.

It works by converting the key into a 64-bit integer which is used as a random seed. It then deterministically "jumps" forward through bucket indices until it lands on the one designated for the key within the given shardCount.

func ModulusHash added in v0.0.10

func ModulusHash[K comparable](k K, shardCount int) uint64

ModulusHash implements a simple sharding strategy using the modulo operator (%).

It works by converting the key into a 64-bit hash and then finding the remainder when that hash is divided by the total number of shards.

While very fast, its major drawback is that changing the shard count causes most keys to be remapped to new shards. It is best used when the number of shards is fixed and will not change.

Types

type Closeable added in v0.0.10

type Closeable interface {
	// Close signals the component to flush any remaining data and shut down.
	// It should be called when the component is no longer needed.
	Close()
}

type DataStream

type DataStream[T any] struct {
	// contains filtered or unexported fields
}

DataStream is a struct that defines a generic stream process stage. It manages one or more input channels (inStreams) and a shared error stream.

func Expand added in v0.0.7

func Expand[T any, U any](
	ds DataStream[T],
	expandFunc ExpandFunc[T, U],
	params ...Params,
) DataStream[U]

func Map

func Map[T any, U any](
	ds DataStream[T],
	transformFunc TransformFunc[T, U],
	params ...Params,
) DataStream[U]

Map is a package-level function that transforms each item from T to U using a TransformFunc.

func New

func New[T any](
	ctx context.Context,
	inStream <-chan T,
	errStream chan<- error,
) DataStream[T]

New constructs a new DataStream of a given type by passing in a context, an input channel, and an error channel. Additional channels can be introduced internally via transformations like FanOut.

func Window added in v0.0.8

func Window[T any, K comparable, R any](
	keyedDs KeyedDataStream[T, K],
	wf WindowFunc[T, R],
	partitioner Partitioner[T, K],
	param ...Params,
) DataStream[R]

Window applies a windowing function to a KeyedDataStream, producing a new KeyedDataStream with re-partitioned output. It uses the provided Partitioner[T], KeyFunc, and optional Params to define the windowing behavior.

func (DataStream[T]) Broadcast

func (p DataStream[T]) Broadcast(
	params ...Params,
) DataStream[T]

Broadcast sends each item to param.Num new channels, effectively duplicating every item across all output channels.

func (DataStream[T]) FanIn

func (p DataStream[T]) FanIn(
	params ...Params,
) DataStream[T]

FanIn merges a slice of input channels into a single output channel.

func (DataStream[T]) FanOut

func (p DataStream[T]) FanOut(
	params ...Params,
) DataStream[T]

FanOut duplicates the number of output channels by param.Num, distributing incoming items in a round-robin manner across all new channels.

func (DataStream[T]) Filter

func (p DataStream[T]) Filter(filter FilterFunc[T], params ...Params) DataStream[T]

Filter applies a user defined function to each value in the input stream(s) and only returns values that pass the filter check (true). If an error occurs, the item is dropped.

func (DataStream[T]) Listen added in v0.0.9

func (p DataStream[T]) Listen(
	listenerIndex int,
	params ...Params,
) DataStream[T]

Listen allows you to subscribe to a DataStream, effectively creating a new DataStream that listens to the same input channel. This allows you to broadcast the same input stream to multiple listeners.

func (DataStream[T]) OrDone

func (p DataStream[T]) OrDone(
	params ...Params,
) DataStream[T]

OrDone terminates if the input stream is closed or context is done, effectively passing items through until the upstream channel signals completion.

func (DataStream[T]) Out

func (p DataStream[T]) Out() <-chan T

Out returns the single output channel of this DataStream. If the DataStream has multiple input channels, it automatically FanIns them into a single output.

func (DataStream[T]) Run

func (p DataStream[T]) Run(
	proc ProcessFunc[T],
	params ...Params,
) DataStream[T]

Run executes a user defined process function on the input stream(s). Each input channel is handled in its own goroutine, writing processed results to a newly created set of output channels. Errors can be skipped if SkipError is set.

func (DataStream[T]) Sink

func (p DataStream[T]) Sink(
	sinker Sinker[T],
	params ...Params,
) DataStream[T]

Sink outputs DataStream values to a defined Sinker in a separate goroutine. This allows the pipeline to continue processing asynchronously.

func (DataStream[T]) Take

func (p DataStream[T]) Take(
	params ...Params,
) DataStream[T]

Take returns only the first N items from the input streams. If multiple input streams exist, each is read up to N items, meaning total items could be N * numberOfStreams.

func (DataStream[T]) Tee

func (p DataStream[T]) Tee(
	params ...Params,
) (DataStream[T], DataStream[T])

Tee splits values coming in from a single channel so that you can send them off into two separate DataStream outputs.

func (DataStream[T]) WithWaitGroup

func (p DataStream[T]) WithWaitGroup(wg *sync.WaitGroup) DataStream[T]

WithWaitGroup attaches a WaitGroup to this DataStream, returning a copy.

type Error

type Error struct {
	Code    ErrorCode
	Segment string
	Message string
}

Error defines a custom error type

func (*Error) Error

func (e *Error) Error() string

Error implements the Error interface

type ErrorCode

type ErrorCode int

ErrorCode represents a generic enrich ErrorCode

const (
	RUN ErrorCode = iota
	FILTER
	MAP
	SINK
	EXPAND
	KEY_BY
	WINDOW
)

func (ErrorCode) Message

func (w ErrorCode) Message(msg string, segment string) string

Message converts ErrorCode enum into a human-readable message

func (ErrorCode) String

func (w ErrorCode) String() string

String converts ErrorCode enum into a string value

type ExpandFunc added in v0.0.7

type ExpandFunc[T any, U any] func(T) ([]U, error)

ExpandFunc is a user defined function type used in a given DataStream stage This function type is used to expand a given input into multiple outputs

type FilterFunc

type FilterFunc[T any] func(T) (bool, error)

FilterFunc is a user defined function type used in a given DataStream stage This function type is used to filter a given input type

type KeyFunc added in v0.0.8

type KeyFunc[T any, K comparable] func(T) K

KeyFunc is a user defined function that takes a value of type T and returns a key of type K for the given value. This function is used to partition pipeline data streams by key.

type Keyable added in v0.0.10

type Keyable[K comparable] interface {
	Key() K
}

type KeyableElement added in v0.0.10

type KeyableElement[T any, K comparable] interface {
	Keyable[K]
	Value() T
}

func NewKeyedElement added in v0.0.10

func NewKeyedElement[T any, K comparable](key K, value T) KeyableElement[T, K]

NewKeyedElement creates a new KeyableElement with the given key and value.

type KeyedDataStream added in v0.0.8

type KeyedDataStream[T any, K comparable] struct {
	DataStream[KeyableElement[T, K]]
	// contains filtered or unexported fields
}

KeyedDataStream represents a stream of data elements partitioned by a key of type K, derived using a key function.

func KeyBy added in v0.0.8

func KeyBy[T any, K comparable](
	ds DataStream[T],
	keyFunc KeyFunc[T, K],
	params ...Params,
) KeyedDataStream[T, K]

KeyBy a DataStream into multiple channels (one channel per computed key) so that all items with the same key travel through the same DataStream[T].

func (KeyedDataStream[T, K]) WithTimeMarker added in v0.0.8

func (k KeyedDataStream[T, K]) WithTimeMarker(
	timeMarker TimeMarker,
) KeyedDataStream[T, K]

WithTimeMarker attaches a time marker to the KeyedDataStream.

func (KeyedDataStream[T, K]) WithWatermarkGenerator added in v0.0.8

func (k KeyedDataStream[T, K]) WithWatermarkGenerator(
	waterMarker WatermarkGenerator[T],
) KeyedDataStream[T, K]

WithWatermarkGenerator attaches a watermark generator to the KeyedDataStream.

type Params

type Params struct {
	Num         int
	BufferSize  int
	SkipError   bool
	SegmentName string
	ShardCount  int
}

Params are used to pass args into DataStream methods.

type Partition added in v0.0.10

type Partition[T any, K comparable] interface {
	// Push sends a new element into the partition for processing.
	Push(item TimedKeyableElement[T, K])
}

Partition represents a single, active partition for a specific key. It is an active component responsible for its own logic.

type Partitioner added in v0.0.10

type Partitioner[T any, K comparable] interface {
	// Create initializes and starts a new, active Partition instance.
	// It takes a context for cancellation and an output channel where it will
	// send its results (e.g., slices of T for windows).
	Create(ctx context.Context, out chan<- []T) Partition[T, K]
	Closeable
}

Partitioner TODO: Decouple Partitioning from Windowing. ref github issue #61 Partitioner acts as a template or factory for creating new partitions. A single Partitioner (e.g., a configured Sliding window) is used to create all active partition instances for different keys.

type ProcessFunc

type ProcessFunc[T any] func(T) (T, error)

ProcessFunc is a user defined function type used in a given DataStream stage

type ShardKeyFunc added in v0.0.10

type ShardKeyFunc[K comparable] func(k K, shardCount int) uint64

ShardKeyFunc defines the function signature for a sharding algorithm. It takes a key and the total number of shards and must return a deterministic shard index (typically in the range [0, shardCount-1]). Implementations can include modulus hashing, jump hashing, or other consistent hashing algorithms.

type ShardedStore added in v0.0.10

type ShardedStore[T any, K comparable] struct {
	*ShardedStoreOpts[K]
	// contains filtered or unexported fields
}

ShardedStore is a thread-safe, in-memory generic key-value store. It distributes data across a fixed number of internal shards to reduce lock contention and improve concurrency in high-throughput environments. The sharding strategy is determined by the provided ShardKeyFunc.

func NewShardedPartitionStore added in v0.0.10

func NewShardedPartitionStore[T any, K comparable](
	opts *ShardedStoreOpts[K],
) *ShardedStore[T, K]

NewShardedPartitionStore creates and initializes a new ShardedStore with the provided options. If the ShardCount in the options is zero, it defaults to 1 to ensure a functional store.

func (*ShardedStore[T, K]) Close added in v0.0.10

func (s *ShardedStore[T, K]) Close()

Close clears all entries in the ShardedStore. It iterates through all shards and deletes each key-value pair.

func (*ShardedStore[T, K]) Get added in v0.0.10

func (s *ShardedStore[T, K]) Get(k K) (Partition[T, K], bool)

Get retrieves a Partition[T] by its key K from the store. It returns the Partition and a boolean indicating whether the key was found. If the key does not exist, it returns a zero value of Partition[T] and false

func (*ShardedStore[T, K]) Initialize added in v0.0.10

func (s *ShardedStore[T, K]) Initialize()

func (*ShardedStore[T, K]) Keys added in v0.0.10

func (s *ShardedStore[T, K]) Keys() []K

Keys returns a slice of all keys currently stored in the ShardedStore. It iterates through all shards and collects the keys from each shard's sync.Map.

func (*ShardedStore[T, K]) Set added in v0.0.10

func (s *ShardedStore[T, K]) Set(k K, v Partition[T, K]) (shardIndex int)

Set stores a Partition[T] under the key K in the store. It returns the index of the shard where the value was stored. The shard index is determined by the ShardKeyFunc provided in the store's options. If the key already exists, it updates the value in the corresponding shard.

type ShardedStoreOpts added in v0.0.10

type ShardedStoreOpts[K comparable] struct {
	// ShardCount is the total number of concurrent shards to create.
	// Must be greater than zero.
	ShardCount int
	// ShardKeyFunc is the hashing function used to map a key to a shard index.
	ShardKeyFunc ShardKeyFunc[K]
}

ShardedStoreOpts provides the configuration for creating a new ShardedStore.

type Sinker

type Sinker[T any] interface {
	Sink(ctx context.Context, ds DataStream[T]) error
}

Sinker is an interface that defines the Sink method

type Sourcer

type Sourcer[T any] interface {
	Source(ctx context.Context, errSender chan<- error) DataStream[T]
}

Sourcer is an interface that defines the Source method

type TimeMarker added in v0.0.10

type TimeMarker interface {
	Now() time.Time
}

type TimedKeyableElement added in v0.0.10

type TimedKeyableElement[T any, K comparable] interface {
	KeyableElement[T, K]
	Time() time.Time
}

func NewTimedKeyedElement added in v0.0.10

func NewTimedKeyedElement[T any, K comparable](v KeyableElement[T, K], t time.Time) TimedKeyableElement[T, K]

NewTimedKeyedElement creates a new TimedKeyableElement with the given key, value, and time.

type TransformFunc

type TransformFunc[T any, U any] func(T) (U, error)

TransformFunc is a user defined function type used in a given DataStream stage This function type is used to transform a given input type to a given output type

type WatermarkGenerator added in v0.0.10

type WatermarkGenerator[T any] interface {
	// OnEvent is called whenever a new event arrives, allowing the generator to update its state.
	//  Event: the event that arrived
	//  EventTime: the time of the event
	OnEvent(event T, eventTime time.Time)

	// GetWatermark returns the current watermark.
	GetWatermark() time.Time
}

type WindowFunc added in v0.0.8

type WindowFunc[T any, R any] func([]T) (R, error)

WindowFunc processes a given batch of data and returns a result. You can use this function in conjunction with the KeyedDataStream to perform windowed operations on batches of data with a given key.

type WindowedPartitioner added in v0.0.10

type WindowedPartitioner[T any, K comparable] interface {
	// Create initializes and starts a new, active Partition instance.
	// It takes a context for cancellation and an output channel where it will
	Create(ctx context.Context, out chan<- []T) Partition[T, K]
	Closeable
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL