pipeline

package
v0.0.0-...-c9303cd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2020 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

TODO explain here how to write wrappers to use without casting from `interface{}`.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BufferedReadWriter

type BufferedReadWriter struct {
	// contains filtered or unexported fields
}

BufferedReadWriter implements Reader and Writer and acts like a pipe. All writes are queued in a buffered channel and are waiting to be consumed.

Used internally by Pipeline but also helpful for testing.

func (*BufferedReadWriter) Close

func (b *BufferedReadWriter) Close() error

Close can be called in `Writer` and `Reader` context.

In `Reader` it means that no more values will be read so writer can stop writing to a buffer (`io.ErrClosedPipe` will be returned for calls to `Write()`).

In `Writer` it means that no more values will be written so reader should start returning `io.EOF` error after returning all queued values.

func (*BufferedReadWriter) GetContext

func (b *BufferedReadWriter) GetContext() context.Context

func (*BufferedReadWriter) QueuedEntries

func (b *BufferedReadWriter) QueuedEntries() int

func (*BufferedReadWriter) Read

func (b *BufferedReadWriter) Read() (interface{}, error)

func (*BufferedReadWriter) Write

func (b *BufferedReadWriter) Write(entry interface{}) error

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func New

func New(rootProcessor *PipelineNode) *Pipeline

func (*Pipeline) AddPostProcessingHook

func (p *Pipeline) AddPostProcessingHook(hook func(context.Context, error) error)

func (*Pipeline) AddPreProcessingHook

func (p *Pipeline) AddPreProcessingHook(hook func(context.Context) (context.Context, error))

func (*Pipeline) PrintStatus

func (p *Pipeline) PrintStatus()

func (*Pipeline) Process

func (p *Pipeline) Process(reader Reader) <-chan error

func (*Pipeline) SetRoot

func (p *Pipeline) SetRoot(rootProcessor *PipelineNode)

func (*Pipeline) Shutdown

func (p *Pipeline) Shutdown()

Shutdown stops the processing. Please note that post-processing hooks will not be executed when Shutdown() is called.

type PipelineInterface

type PipelineInterface interface {
	SetRoot(rootProcessor *PipelineNode)
	// AddPreProcessingHook adds a pre-processing hook function. Returned
	// context.Context will be passed to the processors. If error is returned
	// pipeline will not start processing data.
	AddPreProcessingHook(hook func(context.Context) (context.Context, error))
	AddPostProcessingHook(hook func(context.Context, error) error)
	Shutdown()
	PrintStatus()
}

PipelineInterface is an interface that defines common pipeline methods in structs that embed Pipeline.

type PipelineNode

type PipelineNode struct {
	// Remember to update reset() method if you ever add a new field to this struct!
	Processor Processor
	Children  []*PipelineNode
	// contains filtered or unexported fields
}

func Node

func Node(processor Processor) *PipelineNode

func (*PipelineNode) Pipe

func (p *PipelineNode) Pipe(children ...*PipelineNode) *PipelineNode

type Processor

type Processor interface {
	// Process is a main method of `Processor`. It receives `Reader`
	// that contains object passed down the pipeline from the previous procesor. Writes to
	// `Writer` will be passed to the next processor. WARNING! `Process`
	// should **always** call `Close()` on `Writer` when no more object will be
	// written and `Close()` on `Reader` when reading is finished.
	// Data required by following processors (like aggregated data) should be saved in
	// `Store`. Read `Store` godoc to understand how to use it.
	// The first argument `ctx` is a context with cancel. Processor should monitor
	// `ctx.Done()` channel and exit when it returns a value. This can happen when
	// pipeline execution is interrupted, ex. due to an error.
	//
	// Given all information above `Process` should always look like this:
	//
	//    func (p *Processor) Process(ctx context.Context, store *pipeline.Store, r Reader, w Writer) error {
	//    	defer r.Close()
	//    	defer w.Close()
	//
	//    	// Some pre code...
	//
	//    	for {
	//    		entry, err := r.Read()
	//    		if err != nil {
	//    			if err == io.EOF {
	//    				break
	//    			} else {
	//    				return errors.Wrap(err, "Error reading from Reader in [ProcessorName]")
	//    			}
	//    		}
	//
	//    		// Process entry...
	//
	//    		// Write to Writer if needed but exit if pipe is closed:
	//    		err = w.Write(entry)
	//    		if err != nil {
	//    			if err == io.ErrClosedPipe {
	//    				// Reader does not need more data
	//    				return nil
	//    			}
	//    			return errors.Wrap(err, "Error writing to Writer in [ProcessorName]")
	//    		}
	//
	//    		// Return errors if needed...
	//
	//    		// Exit when pipeline terminated due to an error in another processor...
	//    		select {
	//    		case <-ctx.Done():
	//    			return nil
	//    		default:
	//    			continue
	//    		}
	//    	}
	//
	//    	// Some post code...
	//
	//    	return nil
	//    }
	Process(context.Context, *Store, Reader, Writer) error
	// Returns processor name. Helpful for errors, debuging and reports.
	Name() string
	// Reset resets internal state of the processor. This is run by the pipeline
	// everytime before the pipeline starts running.
	// It is extremely important to implement this method, otherwise internal
	// state of the processor will be maintained between pipeline runs and may
	// result in an invalid data.
	Reset()
}

Processor defines methods required by the processing pipeline.

type Reader

type Reader interface {
	// GetContext returns context with values of the current reader. Can be
	// helpful to provide data to structs that wrap `Reader`.
	GetContext() context.Context
	// Read should return next entry. If there are no more
	// entries it should return `io.EOF` error.
	Read() (interface{}, error)
	// Close should be called when reading is finished. This is especially
	// helpful when there are still some entries available so reader can stop
	// streaming them.
	Close() error
}

Reader interface placeholder

type Store

type Store struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Store allows storing data connected to pipeline execution. It exposes `Lock()` and `Unlock()` methods that must be called when accessing the `Store` for both `Put` and `Get` calls.

Example (incrementing a number): s.Lock() v := s.Get("value") s.Put("value", v.(int)+1) s.Unlock()

func (*Store) Get

func (s *Store) Get(name string) interface{}

func (*Store) Put

func (s *Store) Put(name string, value interface{})

type Writer

type Writer interface {
	// Write is used to pass entry to the next processor. It can return
	// `io.ErrClosedPipe` when the pipe between processors has been closed meaning
	// that next processor does not need more data. In such situation the current
	// processor can terminate as sending more entries to a `Writer`
	// does not make sense (will not be read).
	Write(interface{}) error
	// Close should be called when there are no more entries
	// to write.
	Close() error
}

Writer interface placeholder

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL