Documentation
¶
Overview ¶
Package streams provides a set of functions to work with event streams. The package is designed to be used with Kafka, NATS, and other message brokers.
Index ¶
- func Pipe(stream Streamable, rev Receivable)
- type Do
- type DoFunc
- type Filter
- type FilterPredicate
- type FlatMap
- type FlatMapFunc
- type Log
- type Map
- type MapFunc
- type Node
- type Operatable
- type PassThrough
- type Receivable
- type Reduce
- type ReduceFunc
- type Sinkable
- type Skip
- type Sourceable
- type Streamable
- type Topology
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Pipe ¶ added in v0.3.2
func Pipe(stream Streamable, rev Receivable)
Pipe pipes the output channel to the input channel.
Types ¶
type Do ¶ added in v0.3.2
type Do[T any] struct { // contains filtered or unexported fields }
Do takes one element and executes a function on it.
func (*Do[T]) Pipe ¶ added in v0.3.2
func (d *Do[T]) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
type Filter ¶ added in v0.3.2
type Filter[T any] struct { // contains filtered or unexported fields }
Filter filters an incoming element using a filter predicate.
func NewFilter ¶ added in v0.3.2
func NewFilter[T any](fn FilterPredicate[T]) *Filter[T]
NewFilter returns a new operator on filters.
func (*Filter[T]) Pipe ¶ added in v0.3.2
func (f *Filter[T]) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
type FilterPredicate ¶ added in v0.3.2
FilterPredicate represents a filter predicate.
type FlatMap ¶ added in v0.3.2
type FlatMap[T, R any] struct { // contains filtered or unexported fields }
FlatMap takes one element and produces a new element of the same type.
func NewFlatMap ¶ added in v0.3.2
func NewFlatMap[T, R any](fn FlatMapFunc[T, R]) *FlatMap[T, R]
NewFlatMap returns a new operator on maps.
func (*FlatMap[T, R]) Pipe ¶ added in v0.3.2
func (f *FlatMap[T, R]) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
type FlatMapFunc ¶ added in v0.3.2
type FlatMapFunc[T, R any] func(T) []R
FlatMap takes one element and produces a new element of the same type.
type Log ¶ added in v0.3.2
type Log struct {
// contains filtered or unexported fields
}
Log passes through an incoming element.
func (*Log) Pipe ¶ added in v0.3.2
func (l *Log) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
type Map ¶ added in v0.3.2
type Map[T, R any] struct { // contains filtered or unexported fields }
Map takes one element and produces a new element of the same type.
func (*Map[T, R]) Pipe ¶ added in v0.3.2
func (m *Map[T, R]) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
type MapFunc ¶ added in v0.3.2
type MapFunc[T, R any] func(T) R
MapFunc is a function that takes a key and a value and returns a new value.
type Node ¶
type Node interface {
// AddChildren adds children to a node.
AddChild(nodes ...Node)
// Children returns the children of a node.
Children() []Node
// Name returns the name of a node.
Name(names ...string) string
}
Node is a node in a topology.
type Operatable ¶ added in v0.3.2
type Operatable interface {
Streamable
Receivable
// To streams data to the sink and waits for it to complete.
To(sink Sinkable)
}
Operatable is a Operatable interface.
func FanOut ¶ added in v0.3.2
func FanOut(in Streamable, num int) []Operatable
FanOut fans out a stream to multiple streams.
func Merge ¶ added in v0.3.2
func Merge(in ...Streamable) Operatable
Merge merges multiple streams into one.
func Split ¶ added in v0.3.2
func Split[T any](in Streamable, predicate FilterPredicate[T]) [2]Operatable
Split splits a stream in two based on a predicate.
type PassThrough ¶ added in v0.3.2
type PassThrough struct {
// contains filtered or unexported fields
}
PassThrough passes through an incoming element.
func NewPassThrough ¶ added in v0.3.2
func NewPassThrough() *PassThrough
NewPassThrough returns a new operator on pass-throughs.
func (*PassThrough) In ¶ added in v0.3.2
func (p *PassThrough) In() chan<- any
In returns the input channel.
func (*PassThrough) Out ¶ added in v0.3.2
func (p *PassThrough) Out() <-chan any
Out returns the output channel.
func (*PassThrough) Pipe ¶ added in v0.3.2
func (p *PassThrough) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
func (*PassThrough) To ¶ added in v0.3.2
func (p *PassThrough) To(sink Sinkable)
To streams data to the sink and waits for it to complete.
type Receivable ¶ added in v0.3.2
type Receivable interface {
// In returns the input channel.
In() chan<- any
}
Receivable is a receivable interface.
type Reduce ¶ added in v0.3.2
type Reduce[T any] struct { // contains filtered or unexported fields }
Reduce takes the current element and the latest reduced value and produces a new reduced value.
func NewReduce ¶ added in v0.3.2
func NewReduce[T any](fn ReduceFunc[T]) *Reduce[T]
NewReduce returns a new operator on reduces.
func (*Reduce[T]) Pipe ¶ added in v0.3.2
func (r *Reduce[T]) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
type ReduceFunc ¶ added in v0.3.2
type ReduceFunc[T any] func(T, T) T
ReduceFunc combines the current element with the latest reduced value.
type Sinkable ¶ added in v0.3.2
type Sinkable interface {
Receivable
// Wait waits for the sink to complete.
Wait()
// Error returns the error.
Error() error
}
Sinkable is a sinkable interface.
type Skip ¶ added in v0.3.2
type Skip struct {
// contains filtered or unexported fields
}
Skip skips the first n elements.
func (*Skip) Pipe ¶ added in v0.3.2
func (s *Skip) Pipe(c Operatable) Operatable
Pipe pipes the output channel to the input channel.
type Sourceable ¶ added in v0.3.2
type Sourceable interface {
Streamable
// Error returns the error.
Error() error
}
Sourceable is a sourceable interface.
type Streamable ¶ added in v0.3.2
type Streamable interface {
// Out returns the output channel.
Out() <-chan any
// Pipe pipes the output channel to the input channel.
Pipe(Operatable) Operatable
}
Streamable is a streamable interface.