node

package
v0.0.0-...-dd0f3b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2023 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Bind

func Bind[In, Bound, Out any](
	left stream.Processor[In, Bound],
	right stream.Processor[Bound, Out],
) stream.Processor[In, Out]

Bind the output of the left Processor to the input of the right Processor, returning a new Processor that performs the handoff. If an error is reported by the left Processor, the handoff will be short-circuited and the error will be reported downstream.

Processor[In, Out] = Processor[In, Bound] -> Processor[Bound, Out]

In is the input type of the left Processor, Bound is the type of the left Processor's output as well as the input type of the right Processor. Out is the type of the right Processor's output

func Filter

func Filter[Msg any](fn Predicate[Msg]) stream.Processor[Msg, Msg]

Filter constructs a Processor that will only forward its messages if the provided function returns true

func ForEach

func ForEach[Msg any](fn ForEachFunc[Msg]) stream.Processor[Msg, Msg]

ForEach constructs a processor that performs an action on the messages it sees using the provided function, and then forwards the message

func Forward

func Forward[Msg any](c *context.Context[Msg, Msg])

func Generate

func Generate[Msg any](
	gen Generator[Msg],
) stream.Processor[stream.Source, Msg]

func GenerateFrom

func GenerateFrom[Msg any](
	ch <-chan Msg,
) stream.Processor[stream.Source, Msg]

func GroupBy

func GroupBy[Msg any, Key comparable](
	groupKey GroupSelector[Msg, Key],
) stream.Processor[Msg, *Grouped[Msg, Key]]

func GroupedKey

func GroupedKey[Msg any, Key comparable](m *Grouped[Msg, Key]) Key

func GroupedMessage

func GroupedMessage[Msg any, Key comparable](m *Grouped[Msg, Key]) Msg

func Join

func Join[Left, Right, Out any](
	left stream.Processor[stream.Source, Left],
	right stream.Processor[stream.Source, Right],
	predicate BinaryPredicate[Left, Right],
	joiner BinaryOperator[Left, Right, Out],
) stream.Processor[stream.Source, Out]

Join accepts two Processors for the sake of joining their results based on a provided BinaryPredicate and BinaryOperator. If the predicate fails, nothing is forwarded, otherwise the two processed messages are combined using the join function, and the result is forwarded

func Map

func Map[From, To any](fn Mapper[From, To]) stream.Processor[From, To]

Map constructs a processor that maps the messages it sees into new messages using the provided function

func Merge

func Merge[Out any](
	p ...stream.Processor[stream.Source, Out],
) stream.Processor[stream.Source, Out]

Merge forwards results from multiple Processors to the same channel

func Reduce

func Reduce[In, Out any](
	fn Reducer[Out, In],
) stream.Processor[In, Out]

Reduce constructs a processor that reduces the messages it sees into some form of aggregated messages, based on the provided function

func ReduceFrom

func ReduceFrom[In, Out any](
	fn Reducer[Out, In], init Out,
) stream.Processor[In, Out]

ReduceFrom constructs a processor that reduces the messages it sees into some form of aggregated messages, based on the provided function and an initial message

func SidechainTo

func SidechainTo[Msg any](ch chan<- Msg) stream.Processor[Msg, Msg]

func Sink

func Sink[Msg any]() stream.Processor[Msg, stream.Sink]

func SinkInto

func SinkInto[Msg any](ch chan<- Msg) stream.Processor[Msg, stream.Sink]

func Split

func Split[In, Out any](
	p ...stream.Processor[In, Out],
) stream.Processor[In, stream.Sink]

func Subprocess

func Subprocess[Msg any](
	p ...stream.Processor[Msg, Msg],
) stream.Processor[Msg, Msg]

Subprocess is the internal implementation of a Subprocess

func TableLookup

func TableLookup[Msg any, Key comparable, Value any](
	t table.Table[Key, Value],
	c table.ColumnName,
	k table.KeySelector[Msg, Key],
) (stream.Processor[Msg, Value], error)

TableLookup performs a lookup on a table using the provided message. The Key extracts a Key from this message and uses it to perform the lookup against the Table. The Column returned by the lookup is forwarded to the next Processor

func TableUpdater

func TableUpdater[Msg any, Key comparable, Value any](
	t table.Updater[Msg, Key, Value],
) stream.Processor[Msg, Msg]

TableUpdater constructs a processor that sends all messages it sees to the provided table Updater

func TopicConsumer

func TopicConsumer[Msg any](
	t topic.Topic[Msg],
) stream.Processor[stream.Source, Msg]

TopicConsumer constructs a processor that receives from the provided Topic every time it's invoked by the Stream

func TopicProducer

func TopicProducer[Msg any](t topic.Topic[Msg]) stream.Processor[Msg, Msg]

TopicProducer constructs a processor that sends all messages it sees to the provided Topic

Types

type BinaryOperator

type BinaryOperator[Left, Right, Out any] func(Left, Right) Out

BinaryOperator combines the left and right messages into some new result

type BinaryPredicate

type BinaryPredicate[Left, Right any] func(Left, Right) bool

BinaryPredicate is the signature for a function that can perform Stream joining. Returning true will bind the messages in the Stream

type ForEachFunc

type ForEachFunc[Msg any] func(Msg)

ForEachFunc is the signature for a function that can perform some action on the incoming messages of a Stream.

type Generator

type Generator[Msg any] func() (Msg, bool)

type GroupSelector

type GroupSelector[Msg any, Key comparable] func(Msg) Key

type Grouped

type Grouped[Msg any, Key comparable] struct {
	// contains filtered or unexported fields
}

func (*Grouped[_, Key]) Key

func (g *Grouped[_, Key]) Key() Key

func (*Grouped[Msg, _]) Message

func (g *Grouped[Msg, _]) Message() Msg

type Mapper

type Mapper[From, To any] func(From) To

Mapper is the signature for a function that can perform Stream mapping. The message that is returned will be passed downstream

type Predicate

type Predicate[Msg any] func(Msg) bool

Predicate is the signature for a function that can perform Stream filtering. Returning false will drop the message from the Stream

type Reducer

type Reducer[Res, Msg any] func(Res, Msg) Res

Reducer is the signature for a function that can perform Stream reduction. The message that is returned will be passed downstream

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL