tombstreams

package module
v0.0.0-...-9a15792 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 8, 2021 License: MIT Imports: 4 Imported by: 0

README

tombstreams

A stream processing library for Go that combines go-streams and tombs.

Most of the flow capabilities from go-streams are supported. Go channels are the only supported connector. Tombs were added to provide a way to cancel a pipeline and surface any errors.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoStream

func DoStream(outlet Outlet, inlet Inlet)

DoStream streams data from the outlet to inlet.

func GenerateIDs

func GenerateIDs(ctx context.Context, ids []string) <-chan interface{}

Types

type ChanSink

type ChanSink struct {
	Out chan interface{}
}

ChanSink sends data to the output channel

func NewChanSink

func NewChanSink(out chan interface{}) *ChanSink

NewChanSink returns a new ChanSink instance

func (*ChanSink) In

func (ch *ChanSink) In() chan<- interface{}

In returns an input channel for receiving data

type ChanSource

type ChanSource struct {
	// contains filtered or unexported fields
}

ChanSource streams data from the input channel

func NewChanSource

func NewChanSource(t *tomb.Tomb, in <-chan interface{}) *ChanSource

NewChanSource returns a new ChanSource instance

func (*ChanSource) Out

func (cs *ChanSource) Out() <-chan interface{}

Out returns an output channel for sending data

func (*ChanSource) Tomb

func (cs *ChanSource) Tomb() *tomb.Tomb

Tomb returns the tomb context

func (*ChanSource) Via

func (cs *ChanSource) Via(_flow Flow) Flow

Via streams data through the given flow

type Filter

type Filter struct {
	FilterF FilterFunc
	// contains filtered or unexported fields
}

Filter filters the incoming elements using a predicate. If the predicate returns true the element is passed downstream, if it returns false the element is discarded.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[---------- FilterFunc -----------]
    |    |                    |

out -- 1 -- 2 ------------------ 5 --

func NewFilter

func NewFilter(t *tomb.Tomb, filterFunc FilterFunc, parallelism uint) *Filter

NewFilter returns a new Filter instance. filterFunc is the filter predicate function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Filter) In

func (f *Filter) In() chan<- interface{}

In returns an input channel for receiving data

func (*Filter) Out

func (f *Filter) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Filter) To

func (f *Filter) To(sink Sink)

To streams data to the given sink

func (*Filter) Tomb

func (f *Filter) Tomb() *tomb.Tomb

func (*Filter) Via

func (f *Filter) Via(flow Flow) Flow

Via streams data through the given flow

type FilterFunc

type FilterFunc func(interface{}) (bool, error)

FilterFunc is a filter predicate function.

type FlatMap

type FlatMap struct {
	FlatMapF FlatMapFunc
	// contains filtered or unexported fields
}

FlatMap takes one element and produces zero, one, or more elements.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[---------- FlatMapFunc ----------]
    |    |           |   |    |

out -- 1' - 2' -------- 4'- 4”- 5' -

func NewFlatMap

func NewFlatMap(t *tomb.Tomb, flatMapFunc FlatMapFunc, parallelism uint) *FlatMap

NewFlatMap returns a new FlatMap instance. flatMapFunc is the FlatMap transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*FlatMap) In

func (fm *FlatMap) In() chan<- interface{}

In returns an input channel for receiving data

func (*FlatMap) Out

func (fm *FlatMap) Out() <-chan interface{}

Out returns an output channel for sending data

func (*FlatMap) To

func (fm *FlatMap) To(sink Sink)

To streams data to the given sink

func (*FlatMap) Tomb

func (fm *FlatMap) Tomb() *tomb.Tomb

func (*FlatMap) Via

func (fm *FlatMap) Via(flow Flow) Flow

Via streams data through the given flow

type FlatMapFunc

type FlatMapFunc func(interface{}) ([]interface{}, error)

FlatMapFunc is a FlatMap transformation function.

type Flow

type Flow interface {
	Inlet
	Outlet
	Via(Flow) Flow
	To(Sink)
}

Flow is a set of stream processing steps that has one open input and one open output.

func FanOut

func FanOut(outlet Outlet, magnitude int) []Flow

FanOut creates a number of identical flows from the single outlet. This can be useful when writing to multiple sinks is required.

func Merge

func Merge(outlets ...Flow) Flow

Merge merges multiple flows into a single flow.

type IgnoreSink

type IgnoreSink struct {
	// contains filtered or unexported fields
}

IgnoreSink sends items to /dev/null

func NewIgnoreSink

func NewIgnoreSink(t *tomb.Tomb) *IgnoreSink

NewIgnoreSink returns a new IgnoreSink instance

func (*IgnoreSink) In

func (ignore *IgnoreSink) In() chan<- interface{}

In returns an input channel for receiving data

type Inlet

type Inlet interface {
	In() chan<- interface{}
}

Inlet is a type that exposes one open input. Implemented by the Flow and Sink.

type Map

type Map struct {
	MapF MapFunc
	// contains filtered or unexported fields
}

Map takes one element and produces one element.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

    |    |      |    |        |
[----------- MapFunc -------------]
    |    |      |    |        |

out -- 1' - 2' --- 3' - 4' ----- 5' -

func NewMap

func NewMap(t *tomb.Tomb, mapFunc MapFunc, parallelism uint) *Map

NewMap returns a new Map instance. mapFunc is the Map transformation function. parallelism is the flow parallelism factor. In case the events order matters, use parallelism = 1.

func (*Map) In

func (m *Map) In() chan<- interface{}

In returns an input channel for receiving data

func (*Map) Out

func (m *Map) Out() <-chan interface{}

Out returns an output channel for sending data

func (*Map) To

func (m *Map) To(sink Sink)

To streams data to the given sink

func (*Map) Tomb

func (m *Map) Tomb() *tomb.Tomb

func (*Map) Via

func (m *Map) Via(flow Flow) Flow

Via streams data through the given flow

type MapFunc

type MapFunc func(interface{}) (interface{}, error)

MapFunc is a Map transformation function.

type Outlet

type Outlet interface {
	Out() <-chan interface{}
	Tomb() *tomb.Tomb
}

Outlet is a type that exposes one open output. Implemented by the Source and Flow.

type PassThrough

type PassThrough struct {
	// contains filtered or unexported fields
}

PassThrough produces the received element as is.

in -- 1 -- 2 ---- 3 -- 4 ------ 5 --

|    |      |    |        |

out -- 1 -- 2 ---- 3 -- 4 ------ 5 --

func NewPassThrough

func NewPassThrough(t *tomb.Tomb) *PassThrough

NewPassThrough returns a new PassThrough instance.

func (*PassThrough) In

func (pt *PassThrough) In() chan<- interface{}

In returns an input channel for receiving data

func (*PassThrough) Out

func (pt *PassThrough) Out() <-chan interface{}

Out returns an output channel for sending data

func (*PassThrough) To

func (pt *PassThrough) To(sink Sink)

To streams data to the given sink

func (*PassThrough) Tomb

func (pt *PassThrough) Tomb() *tomb.Tomb

func (*PassThrough) Via

func (pt *PassThrough) Via(flow Flow) Flow

Via streams data through the given flow

type Sink

type Sink interface {
	Inlet
}

Sink is a set of stream processing steps that has one open input. Can be used as a Subscriber.

type Source

type Source interface {
	Outlet
	Via(Flow) Flow
}

Source is a set of stream processing steps that has one open output.

type StdoutSink

type StdoutSink struct {
	// contains filtered or unexported fields
}

StdoutSink sends items to stdout

func NewStdoutSink

func NewStdoutSink(t *tomb.Tomb) *StdoutSink

NewStdoutSink returns a new StdoutSink instance

func (*StdoutSink) In

func (stdout *StdoutSink) In() chan<- interface{}

In returns an input channel for receiving data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL