streams

package module
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2025 License: Apache-2.0 Imports: 3 Imported by: 1

README

🏄♀ Streams

Release Go Reference Go Report Card Taylor Swift Volkswagen

A teeny-tiny package to create stream processing workloads.

Docs

You can find the documentation hosted on godoc.org.

Examples

See the examples directory for more.

Operators

  • Do: Execute a function for each element in the stream.
  • Filter: Filter elements from the stream.
  • Map: Transform elements in the stream.
  • Reduce: Reduce elements in the stream.
  • Split: Split the stream into multiple streams.
  • Merge: Merge multiple streams into one.
  • FlatMap: Transform elements in the stream into multiple elements.
  • Skip: Skip elements in the stream.

License

Apache 2.0

Documentation

Overview

Package streams provides a set of functions to work with event streams. The package is designed to be used with Kafka, NATS, and other message brokers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Pipe added in v0.3.2

func Pipe(stream Streamable, rev Receivable)

Pipe pipes the output channel to the input channel.

Types

type Do added in v0.3.2

type Do[T any] struct {
	// contains filtered or unexported fields
}

Do takes one element and executes a function on it.

func NewDo added in v0.3.2

func NewDo[T any](fn DoFunc[T]) *Do[T]

NewDo creates a new Do.

func (*Do[T]) In added in v0.3.2

func (d *Do[T]) In() chan<- any

In returns the input channel.

func (*Do[T]) Out added in v0.3.2

func (d *Do[T]) Out() <-chan any

Out returns the output channel.

func (*Do[T]) Pipe added in v0.3.2

func (d *Do[T]) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*Do[T]) To added in v0.3.2

func (d *Do[T]) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type DoFunc added in v0.3.2

type DoFunc[T any] func(T) error

DoFunc is a function that executes on the element.

type Filter added in v0.3.2

type Filter[T any] struct {
	// contains filtered or unexported fields
}

Filter filters an incoming element using a filter predicate.

func NewFilter added in v0.3.2

func NewFilter[T any](fn FilterPredicate[T]) *Filter[T]

NewFilter returns a new operator on filters.

func (*Filter[T]) In added in v0.3.2

func (f *Filter[T]) In() chan<- any

In returns the input channel.

func (*Filter[T]) Out added in v0.3.2

func (f *Filter[T]) Out() <-chan any

Out returns the output channel.

func (*Filter[T]) Pipe added in v0.3.2

func (f *Filter[T]) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*Filter[T]) To added in v0.3.2

func (f *Filter[T]) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type FilterPredicate added in v0.3.2

type FilterPredicate[T any] func(T) bool

FilterPredicate represents a filter predicate.

type FlatMap added in v0.3.2

type FlatMap[T, R any] struct {
	// contains filtered or unexported fields
}

FlatMap takes one element and produces a new element of the same type.

func NewFlatMap added in v0.3.2

func NewFlatMap[T, R any](fn FlatMapFunc[T, R]) *FlatMap[T, R]

NewFlatMap returns a new operator on maps.

func (*FlatMap[T, R]) In added in v0.3.2

func (f *FlatMap[T, R]) In() chan<- any

In returns the input channel.

func (*FlatMap[T, R]) Out added in v0.3.2

func (f *FlatMap[T, R]) Out() <-chan any

Out returns the output channel.

func (*FlatMap[T, R]) Pipe added in v0.3.2

func (f *FlatMap[T, R]) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*FlatMap[T, R]) To added in v0.3.2

func (f *FlatMap[T, R]) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type FlatMapFunc added in v0.3.2

type FlatMapFunc[T, R any] func(T) []R

FlatMap takes one element and produces a new element of the same type.

type Log added in v0.3.2

type Log struct {
	// contains filtered or unexported fields
}

Log passes through an incoming element.

func NewLog added in v0.3.2

func NewLog(fn logx.LogFunc) *Log

NewLog returns a new operator to log elements.

func (*Log) In added in v0.3.2

func (l *Log) In() chan<- any

In returns the input channel.

func (*Log) Out added in v0.3.2

func (l *Log) Out() <-chan any

Out returns the output channel.

func (*Log) Pipe added in v0.3.2

func (l *Log) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*Log) To added in v0.3.2

func (l *Log) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type Map added in v0.3.2

type Map[T, R any] struct {
	// contains filtered or unexported fields
}

Map takes one element and produces a new element of the same type.

func NewMap added in v0.3.2

func NewMap[T, R any](fn MapFunc[T, R]) *Map[T, R]

NewMap returns a new operator on maps.

func (*Map[T, R]) In added in v0.3.2

func (m *Map[T, R]) In() chan<- any

In returns the input channel.

func (*Map[T, R]) Out added in v0.3.2

func (m *Map[T, R]) Out() <-chan any

Out returns the output channel.

func (*Map[T, R]) Pipe added in v0.3.2

func (m *Map[T, R]) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*Map[T, R]) To added in v0.3.2

func (m *Map[T, R]) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type MapFunc added in v0.3.2

type MapFunc[T, R any] func(T) R

MapFunc is a function that takes a key and a value and returns a new value.

type Node

type Node interface {
	// AddChildren adds children to a node.
	AddChild(nodes ...Node)
	// Children returns the children of a node.
	Children() []Node
	// Name returns the name of a node.
	Name(names ...string) string
}

Node is a node in a topology.

func NewNode

func NewNode(name string) Node

NewNode is a constructor for a new node in the topology.

type Operatable added in v0.3.2

type Operatable interface {
	Streamable
	Receivable
	// To streams data to the sink and waits for it to complete.
	To(sink Sinkable)
}

Operatable is a Operatable interface.

func FanOut added in v0.3.2

func FanOut(in Streamable, num int) []Operatable

FanOut fans out a stream to multiple streams.

func Flatten added in v0.3.2

func Flatten[T any]() Operatable

Flatten creates a flatten stream.

func Merge added in v0.3.2

func Merge(in ...Streamable) Operatable

Merge merges multiple streams into one.

func Split added in v0.3.2

func Split[T any](in Streamable, predicate FilterPredicate[T]) [2]Operatable

Split splits a stream in two based on a predicate.

type PassThrough added in v0.3.2

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough passes through an incoming element.

func NewPassThrough added in v0.3.2

func NewPassThrough() *PassThrough

NewPassThrough returns a new operator on pass-throughs.

func (*PassThrough) In added in v0.3.2

func (p *PassThrough) In() chan<- any

In returns the input channel.

func (*PassThrough) Out added in v0.3.2

func (p *PassThrough) Out() <-chan any

Out returns the output channel.

func (*PassThrough) Pipe added in v0.3.2

func (p *PassThrough) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*PassThrough) To added in v0.3.2

func (p *PassThrough) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type Receivable added in v0.3.2

type Receivable interface {
	// In returns the input channel.
	In() chan<- any
}

Receivable is a receivable interface.

type Reduce added in v0.3.2

type Reduce[T any] struct {
	// contains filtered or unexported fields
}

Reduce takes the current element and the latest reduced value and produces a new reduced value.

func NewReduce added in v0.3.2

func NewReduce[T any](fn ReduceFunc[T]) *Reduce[T]

NewReduce returns a new operator on reduces.

func (*Reduce[T]) In added in v0.3.2

func (r *Reduce[T]) In() chan<- any

In returns the input channel.

func (*Reduce[T]) Out added in v0.3.2

func (r *Reduce[T]) Out() <-chan any

Out returns the output channel.

func (*Reduce[T]) Pipe added in v0.3.2

func (r *Reduce[T]) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*Reduce[T]) To added in v0.3.2

func (r *Reduce[T]) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type ReduceFunc added in v0.3.2

type ReduceFunc[T any] func(T, T) T

ReduceFunc combines the current element with the latest reduced value.

type Sinkable added in v0.3.2

type Sinkable interface {
	Receivable
	// Wait waits for the sink to complete.
	Wait()
	// Error returns the error.
	Error() error
}

Sinkable is a sinkable interface.

type Skip added in v0.3.2

type Skip struct {
	// contains filtered or unexported fields
}

Skip skips the first n elements.

func NewSkip added in v0.3.2

func NewSkip(n int) *Skip

NewSkip returns a new operator on skips.

func (*Skip) In added in v0.3.2

func (s *Skip) In() chan<- any

In returns the input channel.

func (*Skip) Out added in v0.3.2

func (s *Skip) Out() <-chan any

Out returns the output channel.

func (*Skip) Pipe added in v0.3.2

func (s *Skip) Pipe(c Operatable) Operatable

Pipe pipes the output channel to the input channel.

func (*Skip) To added in v0.3.2

func (s *Skip) To(sink Sinkable)

To streams data to the sink and waits for it to complete.

type Sourceable added in v0.3.2

type Sourceable interface {
	Streamable
	// Error returns the error.
	Error() error
}

Sourceable is a sourceable interface.

type Streamable added in v0.3.2

type Streamable interface {
	// Out returns the output channel.
	Out() <-chan any
	// Pipe pipes the output channel to the input channel.
	Pipe(Operatable) Operatable
}

Streamable is a streamable interface.

type Topology

type Topology interface {
	// Root returns the root node of a topology.
	Root() Node
}

Topology is a graph of nodes.

func NewTopology

func NewTopology(root Node) Topology

NewTopology is a constructor for Topology.

Directories

Path Synopsis
examples
reader command
std command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL