Documentation
¶
Overview ¶
Package gstream is stream processing library abstracting pipelines pattern using generic.
Example (Stateful) ¶
ExampleStateful demonstrates a stateful processing using table, join, aggregate.
package main import ( "context" "fmt" "github.com/KumKeeHyun/gstream/state" "sort" ) type Player struct { ID int Name string } type Product struct { ID int Name string } type ScoreEvent struct { PlayerID int ProductID int Score float64 } type ScoreWithPlayer struct { Player Player Score ScoreEvent } type Enriched struct { PlayerID int PlayerName string ProductID int ProductName string Score float64 } type HighScores struct { highScores []Enriched } func (s *HighScores) Add(e Enriched) { added := append(s.highScores, e) sort.SliceStable(added, func(i, j int) bool { return added[i].Score > added[j].Score }) if len(added) > 3 { added = added[:3] } s.highScores = added } // ExampleStateful demonstrates a stateful processing using table, join, aggregate. func main() { playerCh := make(chan Player) productCh := make(chan Product) scoreCh := make(chan ScoreEvent) // Create producer emitting players, products, scores go func() { defer func() { close(playerCh) close(productCh) close(scoreCh) }() for i := 1; i <= 3; i++ { playerCh <- Player{ ID: i, Name: fmt.Sprintf("player-%d", i), } } for i := 1; i <= 2; i++ { productCh <- Product{ ID: i, Name: fmt.Sprintf("product-%d", i), } } scoreCh <- ScoreEvent{ PlayerID: 1, ProductID: 1, Score: 0.6, } scoreCh <- ScoreEvent{ PlayerID: 2, ProductID: 1, Score: 0.5, } scoreCh <- ScoreEvent{ PlayerID: 3, ProductID: 1, Score: 0.7, } scoreCh <- ScoreEvent{ PlayerID: 2, ProductID: 2, Score: 0.8, } scoreCh <- ScoreEvent{ PlayerID: 1, ProductID: 1, Score: 0.8, } scoreCh <- ScoreEvent{ PlayerID: 3, ProductID: 2, Score: 0.4, } scoreCh <- ScoreEvent{ PlayerID: 2, ProductID: 1, Score: 0.9, } }() b := NewBuilder() players := Table[int, Player](b).From( playerCh, func(p Player) int { return p.ID }, state.NewOptions[int, Player](), ) products := Table[int, Product](b).From( productCh, func(p Product) int { return p.ID }, state.NewOptions[int, Product](), ) scores := SelectKey[int, ScoreEvent]( Stream[ScoreEvent](b).From(scoreCh), func(s ScoreEvent) int { return s.PlayerID }, ) // ScoreEvent join with player by playerID withPlayers := JoinStreamTable( scores, players, func(id int, score ScoreEvent, player Player) ScoreWithPlayer { return ScoreWithPlayer{ Player: player, Score: score, } }, ) // Group by productID groupByProduct := GroupBy( withPlayers, func(playerID int, withPlayer ScoreWithPlayer) int { return withPlayer.Score.ProductID }, ) // withPlayer join with product by productID withProducts := JoinStreamTable( groupByProduct, products, func(playerID int, withPlayer ScoreWithPlayer, product Product) Enriched { return Enriched{ PlayerID: withPlayer.Player.ID, PlayerName: withPlayer.Player.Name, ProductID: product.ID, ProductName: product.Name, Score: withPlayer.Score.Score, } }, ) // Perform the aggregation Aggregate[int, Enriched, *HighScores]( withProducts, func() *HighScores { return &HighScores{highScores: []Enriched{}} }, func(kv KeyValue[int, Enriched], aggregate *HighScores) *HighScores { aggregate.Add(kv.Value) return aggregate }, state.NewOptions[int, *HighScores](), ). ToValueStream(). Foreach(func(_ context.Context, hs *HighScores) { fmt.Println(hs.highScores) }) b.BuildAndStart(context.Background()) }
Output: [{1 player-1 1 product-1 0.6}] [{1 player-1 1 product-1 0.6} {2 player-2 1 product-1 0.5}] [{3 player-3 1 product-1 0.7} {1 player-1 1 product-1 0.6} {2 player-2 1 product-1 0.5}] [{2 player-2 2 product-2 0.8}] [{1 player-1 1 product-1 0.8} {3 player-3 1 product-1 0.7} {1 player-1 1 product-1 0.6}] [{2 player-2 2 product-2 0.8} {3 player-3 2 product-2 0.4}] [{2 player-2 1 product-1 0.9} {1 player-1 1 product-1 0.8} {3 player-3 1 product-1 0.7}]
Example (Stateless) ¶
ExampleStateful demonstrates a stateless processing using stream, filter, map, merge
package main import ( "context" "fmt" ) type Tweet struct { ID int Lang string Text string } type Sentiment struct { ID int Text string Score float64 } // ExampleStateful demonstrates a stateless processing using stream, filter, map, merge func main() { tweetCh := make(chan Tweet) // Create producer emitting tweets go func() { defer close(tweetCh) for i := 0; i < 3; i++ { tweetCh <- Tweet{ ID: i, Lang: "en", Text: fmt.Sprintf("some text %d", i), } tweetCh <- Tweet{ ID: i + 10, Lang: "kr", Text: fmt.Sprintf("썸 텍스트 %d", i), } } }() b := NewBuilder() tweets := Stream[Tweet](b).From(tweetCh) // Branch into english english := tweets.Filter(func(t Tweet) bool { return t.Lang == "en" }) // Branch into korean and translate translate := tweets.Filter(func(t Tweet) bool { return t.Lang == "kr" }).Map(func(ctx context.Context, t Tweet) Tweet { // Translate t.Text to English return Tweet{ ID: t.ID, Lang: "en", Text: fmt.Sprintf("translated text %d", t.ID), } }) // Merge english and translate branch merged := english.Merge(translate) // Enrich tweet sentiment := FlatMap(merged, func(ctx context.Context, t Tweet) []Sentiment { // Calculate sentiment score of t.Text and enrich tweet return []Sentiment{ { ID: t.ID, Text: t.Text, Score: 0.5, }, } }) // Print sentiment sentiment.Foreach(func(_ context.Context, s Sentiment) { fmt.Println(s) }) b.BuildAndStart(context.Background()) }
Output: {0 some text 0 0.5} {10 translated text 10 0.5} {1 some text 1 0.5} {11 translated text 11 0.5} {2 some text 2 0.5} {12 translated text 12 0.5}
Index ¶
- func FlatMapErr[T, TR any](s GStream[T], flatMapper func(context.Context, T) ([]TR, error)) (ss GStream[TR], fs FailedGStream[T])
- func JoinStreamTableErr[K, V, VO, VR any](s KeyValueGStream[K, V], t GTable[K, VO], joiner func(K, V, VO) (VR, error)) (rs KeyValueGStream[K, VR], fs FailedKeyValueGStream[K, V])
- func KVFlatMapErr[K, V, KR, VR any](kvs KeyValueGStream[K, V], ...) (KeyValueGStream[KR, VR], FailedKeyValueGStream[K, V])
- func KVFlatMapValuesErr[K, V, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, V) ([]VR, error)) (KeyValueGStream[K, VR], FailedKeyValueGStream[K, V])
- func KVMapErr[K, V, KR, VR any](kvs KeyValueGStream[K, V], ...) (KeyValueGStream[KR, VR], FailedKeyValueGStream[K, V])
- func KVMapValuesErr[K, V, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, V) (VR, error)) (KeyValueGStream[K, VR], FailedKeyValueGStream[K, V])
- func MapErr[T, TR any](s GStream[T], mapper func(context.Context, T) (TR, error)) (ss GStream[TR], fs FailedGStream[T])
- func NewBuilder() *builder
- func Stream[T any](b *builder) *streamBuilder[T]
- func Table[K, V any](b *builder) *tableBuilder[K, V]
- type Change
- type Closer
- type Fail
- type FailedGStream
- type FailedKeyValueGStream
- type GStream
- type GTable
- type KeyValue
- type KeyValueGStream
- func GroupBy[K, V, KR any](kvs KeyValueGStream[K, V], keyMapper func(K, V) KR) KeyValueGStream[KR, V]
- func JoinStreamTable[K, V, VO, VR any](s KeyValueGStream[K, V], t GTable[K, VO], joiner func(K, V, VO) VR) KeyValueGStream[K, VR]
- func KVFlatMap[K, V, KR, VR any](kvs KeyValueGStream[K, V], ...) KeyValueGStream[KR, VR]
- func KVFlatMapValues[K, V, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, V) []VR) KeyValueGStream[K, VR]
- func KVMap[K, V, KR, VR any](kvs KeyValueGStream[K, V], ...) KeyValueGStream[KR, VR]
- func KVMapValues[K, V, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, V) VR) KeyValueGStream[K, VR]
- func SelectKey[K, V any](s GStream[V], keySelecter func(V) K) KeyValueGStream[K, V]
- type Processor
- type ProcessorSupplier
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FlatMapErr ¶
func FlatMapErr[T, TR any](s GStream[T], flatMapper func(context.Context, T) ([]TR, error)) (ss GStream[TR], fs FailedGStream[T])
FlatMapErr transform each record into zero or more records with side effects.
func JoinStreamTableErr ¶
func JoinStreamTableErr[K, V, VO, VR any](s KeyValueGStream[K, V], t GTable[K, VO], joiner func(K, V, VO) (VR, error)) (rs KeyValueGStream[K, VR], fs FailedKeyValueGStream[K, V])
JoinStreamTableErr join records of stream with table's records using inner join with side effects.
func KVFlatMapErr ¶
func KVFlatMapErr[K, V, KR, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, KeyValue[K, V]) ([]KeyValue[KR, VR], error)) (KeyValueGStream[KR, VR], FailedKeyValueGStream[K, V])
KVFlatMapErr transform each record into zero or more records with side effects. KeyValue version of FlatMapErr.
func KVFlatMapValuesErr ¶
func KVFlatMapValuesErr[K, V, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, V) ([]VR, error)) (KeyValueGStream[K, VR], FailedKeyValueGStream[K, V])
KVFlatMapValuesErr transform the value of each record into zero or more values with side effects.
func KVMapErr ¶
func KVMapErr[K, V, KR, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, KeyValue[K, V]) (KeyValue[KR, VR], error)) (KeyValueGStream[KR, VR], FailedKeyValueGStream[K, V])
KVMapErr transform each record into new record with side effects. KeyValue version of MapErr.
func KVMapValuesErr ¶
func KVMapValuesErr[K, V, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, V) (VR, error)) (KeyValueGStream[K, VR], FailedKeyValueGStream[K, V])
KVMapValuesErr transform the value of each record into new value of record with side effects.
func MapErr ¶
func MapErr[T, TR any](s GStream[T], mapper func(context.Context, T) (TR, error)) (ss GStream[TR], fs FailedGStream[T])
MapErr transform each record into new record with side effects.
The first return value is a stream for a normally mapped value. The second return value is a stream for a record and an error value that failed to be mapped.
func NewBuilder ¶
func NewBuilder() *builder
NewBuilder create new builder. builder provides an entry point for the DSL.
b := gstream.NewBuilder() gstream.Stream[int](b) gstream.Table[int, string](b)
Types ¶
type FailedGStream ¶
type FailedKeyValueGStream ¶
type FailedKeyValueGStream[K, V any] interface { Filter(func(KeyValue[K, V], error) bool) FailedKeyValueGStream[K, V] Foreach(func(context.Context, KeyValue[K, V], error)) ToStream() KeyValueGStream[K, V] }
type GStream ¶
type GStream[T any] interface { Filter(func(T) bool) GStream[T] Foreach(func(context.Context, T)) Map(func(context.Context, T) T) GStream[T] MapErr(func(context.Context, T) (T, error)) (GStream[T], FailedGStream[T]) FlatMap(func(context.Context, T) []T) GStream[T] FlatMapErr(func(context.Context, T) ([]T, error)) (GStream[T], FailedGStream[T]) // Merge merge two streams into one. // If two streams are in a different pipeline, a new pipeline is created. Merge(GStream[T], ...pipe.Option) GStream[T] // Pipe creates a new pipeline. // Downstream are processed in the new pipeline. Pipe(...pipe.Option) GStream[T] // To emits records to returned sink channel. To(...sink.Option) <-chan T }
GStream is value stream interface for DSL. it can be converted to KeyValueGStream by SelectKey function.
type GTable ¶
type GTable[K, V any] interface { // ToValueStream convert this table to GStream. // GTable[K, V] -> GStream[V] ToValueStream() GStream[V] // ToStream convert this table to KeyValueGStream. // GTable[K, V] -> KeyValueGStream[K, V] ToStream() KeyValueGStream[K, V] }
GTable is table interface for DSL.
type KeyValueGStream ¶
type KeyValueGStream[K, V any] interface { Filter(func(KeyValue[K, V]) bool) KeyValueGStream[K, V] Foreach(func(context.Context, KeyValue[K, V])) Map(func(context.Context, KeyValue[K, V]) KeyValue[K, V]) KeyValueGStream[K, V] MapErr(func(context.Context, KeyValue[K, V]) (KeyValue[K, V], error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V]) MapValues(func(context.Context, V) V) KeyValueGStream[K, V] MapValuesErr(func(context.Context, V) (V, error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V]) FlatMap(func(context.Context, KeyValue[K, V]) []KeyValue[K, V]) KeyValueGStream[K, V] FlatMapErr(func(context.Context, KeyValue[K, V]) ([]KeyValue[K, V], error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V]) FlatMapValues(func(context.Context, V) []V) KeyValueGStream[K, V] FlatMapValuesErr(func(context.Context, V) ([]V, error)) (KeyValueGStream[K, V], FailedKeyValueGStream[K, V]) // Merge merge two streams into one. // If two streams are in a different pipeline, a new pipeline is created. Merge(KeyValueGStream[K, V], ...pipe.Option) KeyValueGStream[K, V] // Pipe creates a new pipeline. // Subgraph nodes are processed in the new pipeline. Pipe(...pipe.Option) KeyValueGStream[K, V] // To emits records to returned sink channel. To(...sink.Option) <-chan KeyValue[K, V] // ToValueStream convert this stream to a value stream. // KeyValueGStream[K, V] -> GStream[V] ToValueStream() GStream[V] // ToTable convert this stream to a table. ToTable(state.Options[K, V]) GTable[K, V] }
KeyValueGStream is key-value stream interface for DSL. it can be converted to GStream by ToValueStream method or GTable by ToTable method.
func GroupBy ¶
func GroupBy[K, V, KR any](kvs KeyValueGStream[K, V], keyMapper func(K, V) KR) KeyValueGStream[KR, V]
GroupBy group records on a new key.
func JoinStreamTable ¶
func JoinStreamTable[K, V, VO, VR any](s KeyValueGStream[K, V], t GTable[K, VO], joiner func(K, V, VO) VR) KeyValueGStream[K, VR]
JoinStreamTable join records of stream with table's records using inner join.
func KVFlatMap ¶
func KVFlatMap[K, V, KR, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, KeyValue[K, V]) []KeyValue[KR, VR]) KeyValueGStream[KR, VR]
KVFlatMap transform each record into zero or more records. KeyValue version of FlatMap.
func KVFlatMapValues ¶
func KVFlatMapValues[K, V, VR any](kvs KeyValueGStream[K, V], flatMapper func(context.Context, V) []VR) KeyValueGStream[K, VR]
KVFlatMapValues transform the value of each record into zero or more values.
func KVMap ¶
func KVMap[K, V, KR, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, KeyValue[K, V]) KeyValue[KR, VR]) KeyValueGStream[KR, VR]
KVMap transform each record into new record. KeyValue version of Map.
func KVMapValues ¶
func KVMapValues[K, V, VR any](kvs KeyValueGStream[K, V], mapper func(context.Context, V) VR) KeyValueGStream[K, VR]
KVMapValues transform the value of each record into new value of record.
func SelectKey ¶
func SelectKey[K, V any](s GStream[V], keySelecter func(V) K) KeyValueGStream[K, V]
SelectKey set a new key for each record in gstream.