pipe

package
v0.0.0-...-3a937ec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2018 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Blackhole

type Blackhole struct{}

Blackhole pipe consumes messages and silently discards them

func (*Blackhole) ConsumeMessage

func (*Blackhole) ConsumeMessage(message bus.Message) (err error)

type Divert

type Divert struct {
	// contains filtered or unexported fields
}

Divert propagates Messages to downstream Consumer until in Divert mode. When entering in divert mode pipe sends predefined message to downstream. Pipe sends last consumed message when exit divert mode.

func NewDivert

func NewDivert(consumer bus.Consumer, divert bus.Message) (d *Divert)

Create new divert pipe with given consumer and divert message

func (*Divert) ConsumeMessage

func (d *Divert) ConsumeMessage(message bus.Message) (err error)

Consume message from upstream and resend it to downstream then not in divert mode. Otherwise send predefined divert message.

func (*Divert) Divert

func (d *Divert) Divert(on bool)

Divert sets Divert pipe state

type Fn

type Fn struct {
	Downstream bus.Consumer
	Fn         func(message bus.Message) bus.Message
}

func NewFn

func NewFn(fn func(message bus.Message) bus.Message, downstream bus.Consumer) (p *Fn)

func (*Fn) ConsumeMessage

func (p *Fn) ConsumeMessage(message bus.Message) (err error)

func (*Fn) GetConsumer

func (p *Fn) GetConsumer() (c bus.Consumer)

type Lift

type Lift struct {
	// contains filtered or unexported fields
}

Lift pipe holds set catalog of consumed map[string]string messages and propagates them to downstream as name:map[message-id+k]=v. To reset catalog pipe send map[id]map[k]string message with empty message id.

func NewLift

func NewLift(name string, consumer bus.Consumer) (p *Lift)

func (*Lift) ConsumeMessage

func (p *Lift) ConsumeMessage(message bus.Message) (err error)

type Slice

type Slice struct {
	// contains filtered or unexported fields
}

Slicer pipe accepts map[string]interface{} messages and converts them to []interface{}

func NewSlice

func NewSlice(log *logx.Log, consumer bus.Consumer) (p *Slice)

func (*Slice) ConsumeMessage

func (p *Slice) ConsumeMessage(message bus.Message) (err error)

type StrictPipe

type StrictPipe struct {
	// contains filtered or unexported fields
}

Strict pipe consumes flatmap messages only with predeclared IDs. If at least one of declared messages or consumed message is empty Strict pipe send empty message to downstream. If all messages are not empty Strict pipe combines all messages as <message-id>.<flatmap-key> = <flatmap-value> and sends result to downstream.

func NewStrict

func NewStrict(name string, log *logx.Log, downstream bus.Consumer, declared ...string) (p *StrictPipe)

func (*StrictPipe) ConsumeMessage

func (p *StrictPipe) ConsumeMessage(message bus.Message) (err error)

type Tee

type Tee struct {
	// contains filtered or unexported fields
}

Tee replicates consumed message to given downstreams

func NewTee

func NewTee(downstreams ...bus.Consumer) (p *Tee)

func (*Tee) ConsumeMessage

func (p *Tee) ConsumeMessage(message bus.Message) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL