Documentation ¶
Overview ¶
TODO explain here how to write wrappers to use without casting from `interface{}`.
Index ¶
- type BufferedReadWriter
- type Pipeline
- func (p *Pipeline) AddPostProcessingHook(hook func(context.Context, error) error)
- func (p *Pipeline) AddPreProcessingHook(hook func(context.Context) (context.Context, error))
- func (p *Pipeline) PrintStatus()
- func (p *Pipeline) Process(reader Reader) <-chan error
- func (p *Pipeline) SetRoot(rootProcessor *PipelineNode)
- func (p *Pipeline) Shutdown()
- type PipelineInterface
- type PipelineNode
- type Processor
- type Reader
- type Store
- type Writer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BufferedReadWriter ¶
type BufferedReadWriter struct {
// contains filtered or unexported fields
}
BufferedReadWriter implements Reader and Writer and acts like a pipe. All writes are queued in a buffered channel and are waiting to be consumed.
Used internally by Pipeline but also helpful for testing.
func (*BufferedReadWriter) Close ¶
func (b *BufferedReadWriter) Close() error
Close can be called in `Writer` and `Reader` context.
In `Reader` it means that no more values will be read so writer can stop writing to a buffer (`io.ErrClosedPipe` will be returned for calls to `Write()`).
In `Writer` it means that no more values will be written so reader should start returning `io.EOF` error after returning all queued values.
func (*BufferedReadWriter) GetContext ¶
func (b *BufferedReadWriter) GetContext() context.Context
func (*BufferedReadWriter) QueuedEntries ¶
func (b *BufferedReadWriter) QueuedEntries() int
func (*BufferedReadWriter) Read ¶
func (b *BufferedReadWriter) Read() (interface{}, error)
func (*BufferedReadWriter) Write ¶
func (b *BufferedReadWriter) Write(entry interface{}) error
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func New ¶
func New(rootProcessor *PipelineNode) *Pipeline
func (*Pipeline) AddPostProcessingHook ¶
func (*Pipeline) AddPreProcessingHook ¶
func (*Pipeline) PrintStatus ¶
func (p *Pipeline) PrintStatus()
func (*Pipeline) SetRoot ¶
func (p *Pipeline) SetRoot(rootProcessor *PipelineNode)
type PipelineInterface ¶
type PipelineInterface interface { SetRoot(rootProcessor *PipelineNode) // AddPreProcessingHook adds a pre-processing hook function. Returned // context.Context will be passed to the processors. If error is returned // pipeline will not start processing data. AddPreProcessingHook(hook func(context.Context) (context.Context, error)) AddPostProcessingHook(hook func(context.Context, error) error) Shutdown() PrintStatus() }
PipelineInterface is an interface that defines common pipeline methods in structs that embed Pipeline.
type PipelineNode ¶
type PipelineNode struct { // Remember to update reset() method if you ever add a new field to this struct! Processor Processor Children []*PipelineNode // contains filtered or unexported fields }
func Node ¶
func Node(processor Processor) *PipelineNode
func (*PipelineNode) Pipe ¶
func (p *PipelineNode) Pipe(children ...*PipelineNode) *PipelineNode
type Processor ¶
type Processor interface { // Process is a main method of `Processor`. It receives `Reader` // that contains object passed down the pipeline from the previous procesor. Writes to // `Writer` will be passed to the next processor. WARNING! `Process` // should **always** call `Close()` on `Writer` when no more object will be // written and `Close()` on `Reader` when reading is finished. // Data required by following processors (like aggregated data) should be saved in // `Store`. Read `Store` godoc to understand how to use it. // The first argument `ctx` is a context with cancel. Processor should monitor // `ctx.Done()` channel and exit when it returns a value. This can happen when // pipeline execution is interrupted, ex. due to an error. // // Given all information above `Process` should always look like this: // // func (p *Processor) Process(ctx context.Context, store *pipeline.Store, r Reader, w Writer) error { // defer r.Close() // defer w.Close() // // // Some pre code... // // for { // entry, err := r.Read() // if err != nil { // if err == io.EOF { // break // } else { // return errors.Wrap(err, "Error reading from Reader in [ProcessorName]") // } // } // // // Process entry... // // // Write to Writer if needed but exit if pipe is closed: // err = w.Write(entry) // if err != nil { // if err == io.ErrClosedPipe { // // Reader does not need more data // return nil // } // return errors.Wrap(err, "Error writing to Writer in [ProcessorName]") // } // // // Return errors if needed... // // // Exit when pipeline terminated due to an error in another processor... // select { // case <-ctx.Done(): // return nil // default: // continue // } // } // // // Some post code... // // return nil // } Process(context.Context, *Store, Reader, Writer) error // Returns processor name. Helpful for errors, debuging and reports. Name() string // Reset resets internal state of the processor. This is run by the pipeline // everytime before the pipeline starts running. // It is extremely important to implement this method, otherwise internal // state of the processor will be maintained between pipeline runs and may // result in an invalid data. Reset() }
Processor defines methods required by the processing pipeline.
type Reader ¶
type Reader interface { // GetContext returns context with values of the current reader. Can be // helpful to provide data to structs that wrap `Reader`. GetContext() context.Context // Read should return next entry. If there are no more // entries it should return `io.EOF` error. Read() (interface{}, error) // Close should be called when reading is finished. This is especially // helpful when there are still some entries available so reader can stop // streaming them. Close() error }
Reader interface placeholder
type Store ¶
Store allows storing data connected to pipeline execution. It exposes `Lock()` and `Unlock()` methods that must be called when accessing the `Store` for both `Put` and `Get` calls.
Example (incrementing a number): s.Lock() v := s.Get("value") s.Put("value", v.(int)+1) s.Unlock()
type Writer ¶
type Writer interface { // Write is used to pass entry to the next processor. It can return // `io.ErrClosedPipe` when the pipe between processors has been closed meaning // that next processor does not need more data. In such situation the current // processor can terminate as sending more entries to a `Writer` // does not make sense (will not be read). Write(interface{}) error // Close should be called when there are no more entries // to write. Close() error }
Writer interface placeholder