pipeline

package
Version: v0.0.0-...-fb9541f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2020 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetLedgerHeaderFromContext

func GetLedgerHeaderFromContext(ctx context.Context) xdr.LedgerHeaderHistoryEntry

func GetLedgerSequenceFromContext

func GetLedgerSequenceFromContext(ctx context.Context) uint32

func GetLedgerUpgradeChangesFromContext

func GetLedgerUpgradeChangesFromContext(ctx context.Context) []io.Change

func LedgerNode

func LedgerNode(processor LedgerProcessor) *supportPipeline.PipelineNode

func StateNode

func StateNode(processor StateProcessor) *supportPipeline.PipelineNode

Types

type ContextKey

type ContextKey string
const (
	LedgerSequenceContextKey       ContextKey = "ledger_sequence"
	LedgerHeaderContextKey         ContextKey = "ledger_header"
	LedgerUpgradeChangesContextKey ContextKey = "ledger_upgrade_changes"
)

type LedgerPipeline

type LedgerPipeline struct {
	supportPipeline.Pipeline
}

func (*LedgerPipeline) Process

func (p *LedgerPipeline) Process(reader io.LedgerReader) <-chan error

type LedgerProcessor

type LedgerProcessor interface {
	// ProcessLedger is a main method of `LedgerProcessor`. It receives `io.LedgerReader`
	// that contains object passed down the pipeline from the previous procesor. Writes to
	// `io.LedgerWriter` will be passed to the next processor. WARNING! `ProcessLedger`
	// should **always** call `Close()` on `io.LedgerWriter` when no more object will be
	// written and `Close()` on `io.LedgerReader` when reading is finished.
	// Data required by following processors (like aggregated data) should be saved in
	// `Store`. Read `Store` godoc to understand how to use it.
	// The first argument `ctx` is a context with cancel. Processor should monitor
	// `ctx.Done()` channel and exit when it returns a value. This can happen when
	// pipeline execution is interrupted, ex. due to an error.
	// Please note that processor can filter transactions (by not passing them to
	// `io.LedgerWriter`) but it cannot filter ledger upgrade changes
	// (`io.LeaderReader.ReadUpgradeChange`). All upgrade changes will be available
	// for the next processor to read.
	//
	// Given all information above `ProcessLedger` should always look like this:
	//
	//    func (p *Processor) ProcessLedger(ctx context.Context, store *pipeline.Store, r io.LedgerReader, w io.LedgerWriter) (err error) {
	//      defer func() {
	//      	// io.LedgerReader.Close() returns error if upgrade changes have not
	//      	// been processed so it's worth checking the error.
	//      	closeErr := r.Close()
	//      	// Do not overwrite the previous error
	//      	if err == nil {
	//      		err = closeErr
	//      	}
	//      }()
	//    	defer w.Close()
	//
	//    	// Some pre code...
	//
	//    	for {
	//    		entry, err := r.Read()
	//    		if err != nil {
	//    			if err == io.EOF {
	//    				break
	//    			} else {
	//    				return errors.Wrap(err, "Error reading from LedgerReader in [ProcessorName]")
	//    			}
	//    		}
	//
	//    		// Process entry...
	//
	//    		// Write to LedgerWriter if needed but exit if pipe is closed:
	//    		err = w.Write(entry)
	//    		if err != nil {
	//    			if err == io.ErrClosedPipe {
	//    				//    Reader does not need more data
	//    				return nil
	//    			}
	//    			return errors.Wrap(err, "Error writing to LedgerWriter in [ProcessorName]")
	//    		}
	//
	//    		// Return errors if needed...
	//
	//    		// Exit when pipeline terminated due to an error in another processor...
	//    		select {
	//    		case <-ctx.Done():
	//    			return nil
	//    		default:
	//    			continue
	//    		}
	//    	}
	//
	//    	for {
	//    		change, err := r.ReadUpgradeChange()
	//    		if err != nil {
	//    			if err == stdio.EOF {
	//    				break
	//    		} else {
	//    			return err
	//    		}
	//
	//    		// Process ledger upgrade change...
	//    	}
	//
	//    	// Some post code...
	//
	//    	return nil
	//    }
	ProcessLedger(context.Context, *supportPipeline.Store, io.LedgerReader, io.LedgerWriter) error
	// Returns processor name. Helpful for errors, debuging and reports.
	Name() string
	// Reset resets internal state of the processor. This is run by the pipeline
	// everytime the processing is done. It is extremely important to implement
	// this method, otherwise internal state of the processor will be maintained
	// between pipeline runs and may result in invalid data.
	Reset()
}

LedgerProcessor defines methods required by ledger processing pipeline.

type StatePipeline

type StatePipeline struct {
	supportPipeline.Pipeline
}

func (*StatePipeline) Process

func (p *StatePipeline) Process(reader io.StateReader) <-chan error

type StateProcessor

type StateProcessor interface {
	// ProcessState is a main method of `StateProcessor`. It receives `io.StateReader`
	// that contains object passed down the pipeline from the previous procesor. Writes to
	// `io.StateWriter` will be passed to the next processor. WARNING! `ProcessState`
	// should **always** call `Close()` on `io.StateWriter` when no more object will be
	// written and `Close()` on `io.StateReader` when reading is finished.
	// Data required by following processors (like aggregated data) should be saved in
	// `Store`. Read `Store` godoc to understand how to use it.
	// The first argument `ctx` is a context with cancel. Processor should monitor
	// `ctx.Done()` channel and exit when it returns a value. This can happen when
	// pipeline execution is interrupted, ex. due to an error.
	//
	// Given all information above `ProcessState` should always look like this:
	//
	//    func (p *Processor) ProcessState(ctx context.Context, store *pipeline.Store, r io.StateReader, w io.StateWriter) error {
	//    	defer r.Close()
	//    	defer w.Close()
	//
	//    	// Some pre code...
	//
	//    	for {
	//    		entry, err := r.Read()
	//    		if err != nil {
	//    			if err == io.EOF {
	//    				break
	//    			} else {
	//    				return errors.Wrap(err, "Error reading from StateReader in [ProcessorName]")
	//    			}
	//    		}
	//
	//    		// Process entry...
	//
	//    		// Write to StateWriter if needed but exit if pipe is closed:
	//    		err = w.Write(entry)
	//    		if err != nil {
	//    			if err == io.ErrClosedPipe {
	//    				//    Reader does not need more data
	//    				return nil
	//    			}
	//    			return errors.Wrap(err, "Error writing to StateWriter in [ProcessorName]")
	//    		}
	//
	//    		// Return errors if needed...
	//
	//    		// Exit when pipeline terminated due to an error in another processor...
	//    		select {
	//    		case <-ctx.Done():
	//    			return nil
	//    		default:
	//    			continue
	//    		}
	//    	}
	//
	//    	// Some post code...
	//
	//    	return nil
	//    }
	ProcessState(context.Context, *supportPipeline.Store, io.StateReader, io.StateWriter) error
	// Returns processor name. Helpful for errors, debuging and reports.
	Name() string
	// Reset resets internal state of the processor. This is run by the pipeline
	// everytime the processing is done. It is extremely important to implement
	// this method, otherwise internal state of the processor will be maintained
	// between pipeline runs and may result in invalid data.
	Reset()
}

StateProcessor defines methods required by state processing pipeline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL