Documentation ¶
Index ¶
- Variables
- func ATPipelineContextCancel(p Pipeliner) error
- func ATPipelineStartAll(p Pipeliner) error
- func ATPipelineStats(p Pipeliner) error
- func ChannelAsSlice(in <-chan []byte) []string
- func SendContext(ctx context.Context, msg []byte, output chan<- []byte) error
- func SliceAsChannel(data []string, autoClose bool) chan []byte
- func SliceAsChannelRaw(data [][]byte, autoClose bool) chan []byte
- type Broadcaster
- type DataCounter
- type MultiError
- type Node
- type Pipeliner
- type ProcessArgs
- type Receiver
- type SimplePipeline
- type Stats
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ErrAlreadyStarted = errors.New("node already started")
ErrAlreadyStarted returned if Start method is called more than once
var ErrStopNotStarted = errors.New("stopping a not started worker")
ErrStopNotStarted returned when Stop is called before Start method
Functions ¶
func ATPipelineContextCancel ¶
ATPipelineContextCancel context must be propagated to all Nodes
func ATPipelineStartAll ¶
ATPipelineStartAll all Nodes in a pipeline mus be started when pipeline.Start is called
func ATPipelineStats ¶ added in v0.3.0
func ChannelAsSlice ¶
ChannelAsSlice read from in channel until is closed return an slice with all messages received
func SendContext ¶ added in v0.3.0
SendContext try to send msg to output, it returns an error if context is canceled before msg is sent
func SliceAsChannel ¶
SliceAsChannel return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed
func SliceAsChannelRaw ¶
SliceAsChannelRaw same as SliceAsChannel
Types ¶
type Broadcaster ¶
type Broadcaster struct { DataCounter // contains filtered or unexported fields }
Broadcaster allow to write same value to multiple groutines
func (*Broadcaster) Broadcast ¶
func (b *Broadcaster) Broadcast(input <-chan []byte)
Broadcast read values from input and send it to output channels
func (*Broadcaster) Client ¶
func (b *Broadcaster) Client() <-chan []byte
Client create an output chanel, it panics if Broadcast is already called
type DataCounter ¶ added in v0.3.0
type DataCounter struct {
// contains filtered or unexported fields
}
DataCounter a simple atomic wrapper
func (*DataCounter) Stats ¶ added in v0.3.0
func (c *DataCounter) Stats() (count int64, data int64)
func (*DataCounter) SumData ¶ added in v0.3.0
func (c *DataCounter) SumData(msg []byte)
SumData icrement count+1 and data + len(msg) while both values are incremented in an atomic way is posible to get inconsistent reads on call Stats while object is in use
type MultiError ¶
MultiError and error that contains all pipeline's Node.Start error
type Node ¶
type Node struct { Name string // contains filtered or unexported fields }
Node a node that can send and receive data
func (*Node) Chain ¶
Chain send messages emitted by worker to next node, it returns next node to be chained again
func (*Node) Start ¶
Start initialize the worker, worker.Process should be called multiple times until Node is stoped or worker.Process return an error
type Pipeliner ¶
Pipeliner all implementations must meet the following conditions Run must call Node.Start of all Nodes Context passed in Run must be propagated to all Node.Start methods Nodes() return an slice with all instances of *Node
func NewSimplePipeline ¶
NewSimplePipeline create a linear pipeline
type ProcessArgs ¶ added in v0.3.0
type Receiver ¶
type Receiver struct { DataCounter // contains filtered or unexported fields }
Receiver join multiple channels into a single output channel this allow to add new channels after Receive is called
type SimplePipeline ¶
type SimplePipeline struct {
// contains filtered or unexported fields
}
SimplePipeline default value is unusable, you must create it with NewSimplePipeline
func (*SimplePipeline) Nodes ¶
func (p *SimplePipeline) Nodes() []*Node
Nodes return all instances of *Node
func (*SimplePipeline) Run ¶
func (p *SimplePipeline) Run(ctx context.Context) error
Run init pipeline proccesing, return an error!= nil if any Node fail
func (*SimplePipeline) Stats ¶
func (p *SimplePipeline) Stats() map[string]Stats
Stats returns a map with all nodes Stats object
type Worker ¶
type Worker interface { //Process must close write only channel Process(ctx context.Context, args ProcessArgs) error }
Worker is standard interface implemented by proccessors, is used to build pipeline nodes All Worker implementations must meet the following conditions On close input channel, Process must finalize its work gracefully, and return nil On context cancellation, Process finalize ASAP and return context.Cancelled On finish, Process must close output channel and return error or nil