search

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2020 License: Apache-2.0 Imports: 33 Imported by: 3

README

reference License

The dfuse Search engine is an innovative, both historical and real-time, fork-aware, blockchain search engine. It is part of dfuse.

Features

It can act as a distributed system, composed of real-time and archive backends, plus a router addressing the right backends, discovered through an etcd cluster.

It supports massively parallelized indexing of the chain (put in the power, and process 20TB of data in 30 minutes). It is designed for high availability, and scales horizontally.

It feeds from a dfuse source, like dfuse for EOSIO

Installation & Usage

See the different protocol-specific dfuse binaries at https://github.com/dfuse-io/dfuse#protocols

Current search implementations:

Contributing

Issues and PR in this repo related strictly to the core search engine.

Report any protocol-specific issues in their respective repositories

Please first refer to the general dfuse contribution guide, if you wish to contribute to this code base.

This codebase uses unit tests extensively, please write and run tests.

License

Apache 2.0

Documentation

Index

Constants

View Source
const MaxInt = int(^uint(0) >> 1)
View Source
const TimeFormatBleveID = "2006-01-02T15-04-05.000"

Variables

View Source
var BoolFieldMapping *mapping.FieldMapping
View Source
var DisabledMapping *mapping.DocumentMapping

General purpose mappers

View Source
var DynamicNestedDocMapping *mapping.DocumentMapping
View Source
var ErrEndOfRange = errors.New("end of block range")
View Source
var GetSearchMatchFactory func() SearchMatch
View Source
var SortableNumericFieldMapping *mapping.FieldMapping
View Source
var TestMatchCollector = func(ctx context.Context, lowBlockNum, highBlockNum uint64, results bsearch.DocumentMatchCollection) (out []SearchMatch, err error) {
	trxs := make(map[string][]uint16)
	var trxList []*testTrxResult

	for _, el := range results {
		if err := ctx.Err(); err != nil {
			return nil, err
		}

		blockNum, trxID, actionIdx, skip := testExplodeDocumentID(el.ID)
		if skip {
			continue
		}

		if blockNum < lowBlockNum || blockNum > highBlockNum {
			continue
		}

		if _, found := trxs[trxID]; !found {
			trxList = append(trxList, &testTrxResult{
				id:       trxID,
				blockNum: blockNum,
			})
		}

		trxs[trxID] = append(trxs[trxID], actionIdx)
	}

	for _, trx := range trxList {
		actions := trxs[trx.id]
		sort.Slice(actions, func(i, j int) bool { return actions[i] < actions[j] })

		out = append(out, &testSearchMatch{
			blockNumber:   trx.blockNum,
			trxIDPrefix:   trx.id,
			actionIndexes: actions,
		})
	}

	return out, nil
}
View Source
var TxtFieldMapping *mapping.FieldMapping

Functions

func CheckIndexIntegrity

func CheckIndexIntegrity(path string, shardSize uint64) (*indexMetaInfo, error)

func DoForFirstNChunks

func DoForFirstNChunks(input []byte, nchunks int, chunkSize int, function func(idxRef int, chunkRef []byte))

func GetIrreversibleBlock

func GetIrreversibleBlock(blockmetaCli pbblockmeta.BlockIDClient, blockNum uint64, ctx context.Context, retries int) (bstream.BlockRef, error)

GetIrreversibleBlock will do whatever it takes to fetch the irreversible block at height `blockNum`, up to a number of retries. -1 retries mean forever

func GetLibBlock

func GetLibBlock(blockmetaCli pbblockmeta.BlockIDClient) (bstream.BlockRef, error)

func GetLibInfo

func GetLibInfo(headinfoCli pbheadinfo.HeadInfoClient) (bstream.BlockRef, error)

func MapFirstNChunks

func MapFirstNChunks(input []byte, nChunks int, chunkSize int) map[string]string

func NewCursor

func NewCursor(blockNum uint64, headBlockID string, trxPrefix string) string

v0: 0:blocknum:(irr|headBlockID):trxPrefix

func ValidateRegistry

func ValidateRegistry() error

Types

type BleveQuery

type BleveQuery struct {
	Raw string

	FieldTransformer querylang.FieldTransformer

	FieldNames []string
	Validator  BleveQueryValdiator
	// contains filtered or unexported fields
}

func NewParsedQuery

func NewParsedQuery(rawQuery string) (*BleveQuery, error)

func (*BleveQuery) BleveQuery

func (q *BleveQuery) BleveQuery() query.Query

func (*BleveQuery) Hash

func (q *BleveQuery) Hash() (string, error)

func (*BleveQuery) Parse

func (q *BleveQuery) Parse() error

func (*BleveQuery) Validate

func (q *BleveQuery) Validate() error

type BleveQueryFactory

type BleveQueryFactory func(rawQuery string) *BleveQuery
var GetBleveQueryFactory BleveQueryFactory

type BleveQueryValdiator

type BleveQueryValdiator interface {
	Validate(q *BleveQuery) error
}

type BlockMapper

type BlockMapper interface {
	Map(mapper *mapping.IndexMappingImpl, block *bstream.Block) ([]*document.Document, error)
	IndexMapping() *mapping.IndexMappingImpl
}

type BoundaryBlockInfo

type BoundaryBlockInfo struct {
	Num  uint64
	ID   string
	Time time.Time
}

type IndexedField

type IndexedField struct {
	Name      string    `json:"name"`
	ValueType ValueType `json:"type"`
}

type IndexedFieldsMapFunc

type IndexedFieldsMapFunc func() map[string]*IndexedField

type Mapper

type Mapper struct {
	// contains filtered or unexported fields
}

func NewMapper

func NewMapper(blockMapper BlockMapper) (*Mapper, error)

func (*Mapper) MapBlock

func (m *Mapper) MapBlock(blk *bstream.Block) ([]*document.Document, error)

func (*Mapper) PreprocessBlock

func (m *Mapper) PreprocessBlock(blk *bstream.Block) (interface{}, error)

type MatchCollector

type MatchCollector func(ctx context.Context, lowBlockNum, highBlockNum uint64, results bsearch.DocumentMatchCollection) ([]SearchMatch, error)
var GetMatchCollector MatchCollector

type MultiError

type MultiError []error

func (MultiError) Error

func (m MultiError) Error() string

type PreIndexer

type PreIndexer struct {
	// contains filtered or unexported fields
}

PreIndexer is a bstream Preprocessor that returns the bleve object instead from a bstream.block

func NewPreIndexer

func NewPreIndexer(blockMapper BlockMapper, liveIndexesPath string) *PreIndexer

func (*PreIndexer) Preprocess

func (i *PreIndexer) Preprocess(blk *bstream.Block) (interface{}, error)

type QueryMetrics

type QueryMetrics struct {

	// The number of transactions we have seen from our shard results
	TransactionSeenCount int64

	// The number of actual walked indexes, regardless if it was actually used to return results or not (cancel index)
	SearchedIndexesCount *atomic.Uint32

	// The number of actual usueful searched indexes, those that were used to return results
	UtilizedIndexesCount *atomic.Uint32

	// The total transaction count utilized through utilized indexes
	UtilizedTrxCount *atomic.Uint32

	// The total time passed cumulatively in each utilized indexes
	UtilizedTotalDuration *atomic.Duration
	// contains filtered or unexported fields
}

func NewQueryMetrics

func NewQueryMetrics(zlog *zap.Logger, descending bool, query string, shardSize uint64, low, high uint64) *QueryMetrics

func (*QueryMetrics) Finalize

func (m *QueryMetrics) Finalize()

func (*QueryMetrics) MarkFirstResult

func (m *QueryMetrics) MarkFirstResult()

type SearchMatch

type SearchMatch interface {
	BlockNum() uint64
	TransactionIDPrefix() string

	GetIndex() uint64
	SetIndex(index uint64)

	FillProtoSpecific(match *pbsearch.SearchMatch, blk *bstream.Block) error
}

func RunSingleIndexQuery

func RunSingleIndexQuery(
	ctx context.Context,
	sortDesc bool,
	lowBlockNum, highBlockNum uint64,
	matchCollector MatchCollector,
	bquery *BleveQuery,
	index index.Index,
	releaseIndex func(),
	metrics *QueryMetrics,
) (
	out []SearchMatch,
	err error,
)

type ShardIndex

type ShardIndex struct {
	index.Index

	IndexBuilder    index.IndexBuilder
	IndexTargetPath string

	// These two values represent the "potential" start and end
	// block. It doesn't mean there is actual data within those two
	// blocks: ex: if block endBlock had 0 transactions, we wouldn't
	// shrink `endBlock`.
	//
	// The chain of [startBlock, endBlock] -> [startBlock, endBlock]
	// *must* be absolutely continuous from index to index within the
	// process, and between the different segments of indexes
	// (readOnly, merging, writable, and live)
	StartBlock     uint64 // inclusive
	StartBlockID   string
	StartBlockTime time.Time
	EndBlock       uint64 // inclusive
	EndBlockID     string
	EndBlockTime   time.Time

	Lock sync.RWMutex
	// contains filtered or unexported fields
}

func NewShardIndexWithAnalysisQueue

func NewShardIndexWithAnalysisQueue(baseBlockNum uint64, shardSize uint64, idx index.Index, pathFunc filePathFunc, analysisQueue *index.AnalysisQueue) (*ShardIndex, error)

func (*ShardIndex) Close

func (s *ShardIndex) Close() error

func (*ShardIndex) GetBoundaryBlocks

func (s *ShardIndex) GetBoundaryBlocks(idx index.Index) (start *BoundaryBlockInfo, end *BoundaryBlockInfo, err error)

func (*ShardIndex) RequestCoversFullRange

func (s *ShardIndex) RequestCoversFullRange(low, high uint64) bool

func (*ShardIndex) WritablePath

func (s *ShardIndex) WritablePath(suffix string) string

type SingleIndex

type SingleIndex struct {
	index.Index
	// contains filtered or unexported fields
}

func (*SingleIndex) Delete

func (i *SingleIndex) Delete()

func (*SingleIndex) GetIndex

func (i *SingleIndex) GetIndex() index.Index

type ValueType

type ValueType int32
const (
	AccountType ValueType = iota
	AddressType
	ActionType
	ActionIndexType
	AssetType
	BooleanType
	BlockNumType
	HexType
	FreeFormType
	NameType
	PermissionType
	TransactionIDType
	NumberType
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL