Documentation ¶
Index ¶
- func Bind[In, Bound, Out any](left stream.Processor[In, Bound], right stream.Processor[Bound, Out]) stream.Processor[In, Out]
- func Filter[Msg any](fn Predicate[Msg]) stream.Processor[Msg, Msg]
- func ForEach[Msg any](fn ForEachFunc[Msg]) stream.Processor[Msg, Msg]
- func Forward[Msg any](c *context.Context[Msg, Msg])
- func Generate[Msg any](gen Generator[Msg]) stream.Processor[stream.Source, Msg]
- func GenerateFrom[Msg any](ch <-chan Msg) stream.Processor[stream.Source, Msg]
- func GroupBy[Msg any, Key comparable](groupKey GroupSelector[Msg, Key]) stream.Processor[Msg, *Grouped[Msg, Key]]
- func GroupedKey[Msg any, Key comparable](m *Grouped[Msg, Key]) Key
- func GroupedMessage[Msg any, Key comparable](m *Grouped[Msg, Key]) Msg
- func Join[Left, Right, Out any](left stream.Processor[stream.Source, Left], ...) stream.Processor[stream.Source, Out]
- func Map[From, To any](fn Mapper[From, To]) stream.Processor[From, To]
- func Merge[Out any](p ...stream.Processor[stream.Source, Out]) stream.Processor[stream.Source, Out]
- func Reduce[In, Out any](fn Reducer[Out, In]) stream.Processor[In, Out]
- func ReduceFrom[In, Out any](fn Reducer[Out, In], init Out) stream.Processor[In, Out]
- func SidechainTo[Msg any](ch chan<- Msg) stream.Processor[Msg, Msg]
- func Sink[Msg any]() stream.Processor[Msg, stream.Sink]
- func SinkInto[Msg any](ch chan<- Msg) stream.Processor[Msg, stream.Sink]
- func Split[In, Out any](p ...stream.Processor[In, Out]) stream.Processor[In, stream.Sink]
- func Subprocess[Msg any](p ...stream.Processor[Msg, Msg]) stream.Processor[Msg, Msg]
- func TableLookup[Msg any, Key comparable, Value any](t table.Table[Key, Value], c table.ColumnName, k table.KeySelector[Msg, Key]) (stream.Processor[Msg, Value], error)
- func TableUpdater[Msg any, Key comparable, Value any](t table.Updater[Msg, Key, Value]) stream.Processor[Msg, Msg]
- func TopicConsumer[Msg any](t topic.Topic[Msg]) stream.Processor[stream.Source, Msg]
- func TopicProducer[Msg any](t topic.Topic[Msg]) stream.Processor[Msg, Msg]
- type BinaryOperator
- type BinaryPredicate
- type ForEachFunc
- type Generator
- type GroupSelector
- type Grouped
- type Mapper
- type Predicate
- type Reducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Bind ¶
func Bind[In, Bound, Out any]( left stream.Processor[In, Bound], right stream.Processor[Bound, Out], ) stream.Processor[In, Out]
Bind the output of the left Processor to the input of the right Processor, returning a new Processor that performs the handoff. If an error is reported by the left Processor, the handoff will be short-circuited and the error will be reported downstream.
Processor[In, Out] = Processor[In, Bound] -> Processor[Bound, Out]
In is the input type of the left Processor, Bound is the type of the left Processor's output as well as the input type of the right Processor. Out is the type of the right Processor's output
func Filter ¶
Filter constructs a Processor that will only forward its messages if the provided function returns true
func ForEach ¶
func ForEach[Msg any](fn ForEachFunc[Msg]) stream.Processor[Msg, Msg]
ForEach constructs a processor that performs an action on the messages it sees using the provided function, and then forwards the message
func GenerateFrom ¶
func GroupBy ¶
func GroupBy[Msg any, Key comparable]( groupKey GroupSelector[Msg, Key], ) stream.Processor[Msg, *Grouped[Msg, Key]]
func GroupedKey ¶
func GroupedKey[Msg any, Key comparable](m *Grouped[Msg, Key]) Key
func GroupedMessage ¶
func GroupedMessage[Msg any, Key comparable](m *Grouped[Msg, Key]) Msg
func Join ¶
func Join[Left, Right, Out any]( left stream.Processor[stream.Source, Left], right stream.Processor[stream.Source, Right], predicate BinaryPredicate[Left, Right], joiner BinaryOperator[Left, Right, Out], ) stream.Processor[stream.Source, Out]
Join accepts two Processors for the sake of joining their results based on a provided BinaryPredicate and BinaryOperator. If the predicate fails, nothing is forwarded, otherwise the two processed messages are combined using the join function, and the result is forwarded
func Map ¶
Map constructs a processor that maps the messages it sees into new messages using the provided function
func Merge ¶
func Merge[Out any]( p ...stream.Processor[stream.Source, Out], ) stream.Processor[stream.Source, Out]
Merge forwards results from multiple Processors to the same channel
func Reduce ¶
Reduce constructs a processor that reduces the messages it sees into some form of aggregated messages, based on the provided function
func ReduceFrom ¶
ReduceFrom constructs a processor that reduces the messages it sees into some form of aggregated messages, based on the provided function and an initial message
func SidechainTo ¶
func Subprocess ¶
Subprocess is the internal implementation of a Subprocess
func TableLookup ¶
func TableLookup[Msg any, Key comparable, Value any]( t table.Table[Key, Value], c table.ColumnName, k table.KeySelector[Msg, Key], ) (stream.Processor[Msg, Value], error)
TableLookup performs a lookup on a table using the provided message. The Key extracts a Key from this message and uses it to perform the lookup against the Table. The Column returned by the lookup is forwarded to the next Processor
func TableUpdater ¶
func TableUpdater[Msg any, Key comparable, Value any]( t table.Updater[Msg, Key, Value], ) stream.Processor[Msg, Msg]
TableUpdater constructs a processor that sends all messages it sees to the provided table Updater
func TopicConsumer ¶
TopicConsumer constructs a processor that receives from the provided Topic every time it's invoked by the Stream
Types ¶
type BinaryOperator ¶
type BinaryOperator[Left, Right, Out any] func(Left, Right) Out
BinaryOperator combines the left and right messages into some new result
type BinaryPredicate ¶
BinaryPredicate is the signature for a function that can perform Stream joining. Returning true will bind the messages in the Stream
type ForEachFunc ¶
type ForEachFunc[Msg any] func(Msg)
ForEachFunc is the signature for a function that can perform some action on the incoming messages of a Stream.
type GroupSelector ¶
type GroupSelector[Msg any, Key comparable] func(Msg) Key
type Grouped ¶
type Grouped[Msg any, Key comparable] struct { // contains filtered or unexported fields }
type Mapper ¶
type Mapper[From, To any] func(From) To
Mapper is the signature for a function that can perform Stream mapping. The message that is returned will be passed downstream