indexing-engine

module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2020 License: Apache-2.0

README

Indexing Engine

Description

Indexing engine helps to build indexers using simple to use DSL. It's goal is to provide a logical structure to the process of indexing.

Every indexing pipeline has fixed stages available to hook in to:

  • Setup stage: performs setup tasks
  • Syncer stage: creates syncable
  • Fetcher stage: fetches data for indexing
  • Parser stage: parses and normalizes fetched data to a single structure
  • Validator stage: validates parsed data
  • Sequencer stage: Creates sequences from fetched or/and parsed data
  • Aggregator stage: Creates aggregates from fetched or/and parsed data
  • Persistor stage: Saves data to data store
  • Cleanup stage: Cleans up after execution

Besides that there are 2 additional components: Source and Sink. Source is responsible for providing height iterator for the pipeline and Sink is gathering output data which can be used after the pipeline is done processing.

Below flow-chart depicts all the available stages that come with this package and order in which they are executed.

indexing engine flow chart

Please note that all syncing phase stages are executed in sequence, whereas indexing phase stages are executed concurrently in order to speed up the indexing process.

Installation

To install github.com/figment-networks/indexing-engine use:

go get https://github.com/figment-networks/github.com/figment-networks/indexing-engine

Usage

Setting up stages

In order to set a specific stage you can use:

p.SetStage(
  [Name of the stage],
  pipeline.SyncRunner(NewTask()),
)

As a parameter to p.SetStage function you pass in stage name and StageRunner instance. StageRunner is responsible for running individual tasks inside of the stage. This package provides 2 types of StageRunners:

  1. SyncRunner - executes tasks one by one
  2. AsyncRunner - executes tasks concurrently

If you want to use your own method of running task inside of stage, you can easily create your own implementation of StageRunner and pass it in to p.Set[StageName]Stage.

Starting pipeline

Once stages are setup, we can run our pipeline

options := &pipeline.Options{}
if err := p.Start(ctx, NewSource(), NewSink(), options); err != nil {
    return err
}

This will execute all the tasks for every iteration of all the items in the source created with NewSource() If you want to run one-off iteration of pipeline for specific height you can use Run()

height := 100
payload, err := p.Run(ctx, height, options)

It will return a payload collected for that one iteration of the source.

Adding custom stages

If you want to perform some action on but provided stages are not good logic fit for it, you can always add custom stages BEFORE or AFTER existing ones. In order to do that you can use:

  • AddStageBefore - adds stage before provided existing stage
  • AddStageAfter - adds stage after provided existing stage Below is an example showing how you can add custom stage (as a func) after Fetcher stage
const (
    CustomStageName = "AfterFetcher"
)

afterFetcherFunc := pipeline.StageRunnerFunc(func(ctx context.Context, p pipeline.Payload, f pipeline.TaskValidator) error {
    //...
    return nil
})

p.AddStageBefore(pipeline.StageFetcher, CustomStageName, afterFetcherFunc)
Retrying

github.com/figment-networks/indexing-engine provides 2 types of retrying mechanisms:

  • RetryingStageRunner - which is responsible for retrying the entire stage if error occurred
  • RetryingTask - which is responsible for retrying individual tasks if it return error

In order to implement retrying mechanism you need to wrap stage or task with above functions. Here is an example of use of RetryingTask:

p.SetFetcherStage(
   pipeline.AsyncRunner(
       pipeline.RetryingTask(NewFetcherTask(), func(err error) bool {
           // Make error always transient for simplicity
           return true
       }, 3),
   ),
)
Selective execution

Indexing engine provides you with options to run stages and individual tasks selectively. You have 2 options you can use for this purpose:

  • StagesBlacklist - list of stages to NOT execute
  • TasksWhitelist - list of indexing tasks to execute

In order to use above options you have to use setOptions method of pipeline like so:

p.SetOptions(&pipeline.Options{
    TasksWhitelist: []string{"SequencerTask"},
})

Above example would run only SequencerTask during indexing process. It is useful if you want to reindex the data but you only care about specific set of data.

Examples

In /examples folder you can find an example of a pipeline. To run it use:

go run example/main.go

To-Dos:

  • Collect stats for every stage and every task
  • Add context cancellation

Directories

Path Synopsis
mock
Package mock_pipeline is a generated GoMock package.
Package mock_pipeline is a generated GoMock package.
store

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL