fluxdb

package module
v0.0.0-...-e4e1abe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 28, 2021 License: Apache-2.0 Imports: 37 Imported by: 0

README

FluxDB

A temporal database framework for blockchain state.

Concepts

FluxDB aims at easily storing blockchain state at any block height, enabling developers to retrieve current as well as historical data of the blockchain state.

In essence, it's a framework to modelize your data in such way that the library knowns how to store this save this data at any block height.

Tablet

A Tablet in FluxDB is a set of rows grouped under a single "entity" forming a logical set of data for this entity.

An example of this would be all users' balance for a given token. The Tablet entity would be the contract's token while each row would be an account the row's value the balance of this account at a given block height.

Using this information, FluxDB framework will be able to retrieve you the state of all the balances for a given block height, even if all users modified their balance at a different block height.

Singlet

A Singlet in FluxDB is a set of entry for a given state value written in such way that it's possible to efficiently query the current state of the value as well as querying the state at a given block height.

A Singlet is useful when for a single state value, you want the most efficient way to retrieve the current value.

An example of this could be to retrieve current balance of a single user. By using a Singlet, it will be much more efficient to retrieve the single user's balance efficiently instead of using a Tablet that could requires retrieving the value for a few thousand rows for example

Usage

This section is still a work in progress. The best way current way to learn about FluxDB usage is to inspect https://github.com/dfuse-io/dfuse-eosio/tree/develop/statedb and see how it uses this library to create dfuse EOSIO StateDB.

Contributing

Issues and PR in this repo related strictly to the EOSIO protobuf definitions.

Report any protocol-specific issues in their respective repositories

Please first refer to the general dfuse contribution guide, if you wish to contribute to this code base.

License

Apache 2.0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrCleanSourceStop = errors.New("clean source stop")

Functions

func BuildReprocessingPipeline

func BuildReprocessingPipeline(
	blockFilter func(blk *bstream.Block) error,
	blockMapper BlockMapper,
	blockMeta pbblockmeta.BlockIDClient,
	startBlockResolver bstream.StartBlockResolver,
	handler bstream.Handler,
	blocksStore dstore.Store,
	startHeight uint64,
) (bstream.Source, error)

func ErrInvalidKeyLength

func ErrInvalidKeyLength(tag string, expected, actual int) error

func ErrInvalidKeyLengthAtLeast

func ErrInvalidKeyLengthAtLeast(tag string, expected, actual int) error

func NewKVStore

func NewKVStore(dsnString string) (store.KVStore, error)

NewKVStore creates the underlying KV store engine base on the DSN string received.

This exists in `fluxdb` package since it's shared between `app` and `cmd` packages.

func NewPreprocessBlock

func NewPreprocessBlock(mapper BlockMapper) bstream.PreprocessFunc

func RegisterSingletFactory

func RegisterSingletFactory(collection uint16, collectionName string, factory SingletFactory)

RegisterSingletFactory accepts a collection (and its name) as well as a SingletFactory for this Singlet type and register it in the system so it's known to FluxDB internal components.

func RegisterTabletFactory

func RegisterTabletFactory(collection uint16, collectionName string, factory TabletFactory)

RegisterSingletFactory accepts a collection (and its name) as well as a TabletFactory for this Tablet type and register it in the system so it's known to FluxDB internal components.

func SingletEqual

func SingletEqual(left, right Singlet) bool

SingletEqual returns wheter two Singlet instances are considered equal by comparing their respective collection and identifier.

Two nil values are considered equal.

func TabletEqual

func TabletEqual(left, right Tablet) bool

TabletEqual returns wheter two Tablet instances are considered equal by comparing their respective collection and identifier.

Two nil values are considered equal.

Types

type BaseSingletEntry

type BaseSingletEntry struct {
	// contains filtered or unexported fields
}

func NewBaseSingletEntry

func NewBaseSingletEntry(singlet Singlet, height uint64, value []byte) (out BaseSingletEntry)

func (BaseSingletEntry) Height

func (b BaseSingletEntry) Height() uint64

func (BaseSingletEntry) IsDeletion

func (b BaseSingletEntry) IsDeletion() bool

func (BaseSingletEntry) MarshalValue

func (b BaseSingletEntry) MarshalValue() ([]byte, error)

func (BaseSingletEntry) Singlet

func (b BaseSingletEntry) Singlet() Singlet

func (BaseSingletEntry) String

func (b BaseSingletEntry) String() string

func (BaseSingletEntry) Value

func (b BaseSingletEntry) Value() []byte

type BaseTabletRow

type BaseTabletRow struct {
	// contains filtered or unexported fields
}

func NewBaseTabletRow

func NewBaseTabletRow(tablet Tablet, height uint64, primaryKey []byte, value []byte) (out BaseTabletRow)

func (BaseTabletRow) Height

func (b BaseTabletRow) Height() uint64

func (BaseTabletRow) IsDeletion

func (b BaseTabletRow) IsDeletion() bool

func (BaseTabletRow) MarshalValue

func (b BaseTabletRow) MarshalValue() ([]byte, error)

func (BaseTabletRow) PrimaryKey

func (b BaseTabletRow) PrimaryKey() []byte

func (BaseTabletRow) Stringify

func (b BaseTabletRow) Stringify(primaryKey string) string

func (BaseTabletRow) Tablet

func (b BaseTabletRow) Tablet() Tablet

func (BaseTabletRow) Value

func (b BaseTabletRow) Value() []byte

type BlockMapper

type BlockMapper interface {
	Map(rawBlk *bstream.Block) (*WriteRequest, error)
}

type Collection

type Collection struct {
	Identifier uint16
	Name       string
}

type FluxDB

type FluxDB struct {
	*shutter.Shutter

	SpeculativeWritesFetcher func(ctx context.Context, headBlockID string, upToHeight uint64) (speculativeWrites []*WriteRequest)
	HeadBlock                func(ctx context.Context) bstream.BlockRef
	// contains filtered or unexported fields
}

func New

func New(kvStore store.KVStore, blockFilter func(blk *bstream.Block) error, blockMapper BlockMapper, disableIndexing bool) *FluxDB

func (*FluxDB) BuildPipeline

func (fdb *FluxDB) BuildPipeline(
	blockMeta pbblockmeta.BlockIDClient,
	getBlockID bstream.EternalSourceStartBackAtBlock,
	handler bstream.Handler,
	blocksStore dstore.Store,
	blockStreamAddr string,
)

func (*FluxDB) CheckCleanDBForSharding

func (fdb *FluxDB) CheckCleanDBForSharding() error

func (*FluxDB) Close

func (fdb *FluxDB) Close() error

func (*FluxDB) DeleteAllShardCheckpoints

func (fdb *FluxDB) DeleteAllShardCheckpoints(ctx context.Context) error

func (*FluxDB) FetchLastWrittenCheckpoint

func (fdb *FluxDB) FetchLastWrittenCheckpoint(ctx context.Context) (height uint64, block bstream.BlockRef, err error)

func (*FluxDB) HasSeenAnyRowForTablet

func (fdb *FluxDB) HasSeenAnyRowForTablet(ctx context.Context, tablet Tablet) (exists bool, err error)

func (*FluxDB) IndexTables

func (fdb *FluxDB) IndexTables(ctx context.Context) error

func (*FluxDB) IsReady

func (fdb *FluxDB) IsReady() bool

func (*FluxDB) IsSharding

func (fdb *FluxDB) IsSharding() bool

func (*FluxDB) Launch

func (fdb *FluxDB) Launch(disablePipeline bool)

func (*FluxDB) PruneTabletIndexes

func (fdb *FluxDB) PruneTabletIndexes(ctx context.Context, pruneFrequency int, height uint64, lowerBound Tablet, dryRun bool) (tabletCount int, indexCount int, deletedIndexCount int, err error)

func (*FluxDB) ReadSingletEntryAt

func (fdb *FluxDB) ReadSingletEntryAt(
	ctx context.Context,
	singlet Singlet,
	height uint64,
	speculativeWrites []*WriteRequest,
) (SingletEntry, error)

ReadSingletEntryAt query the storage engine returning the active singlet entry value at specified height.

Returns `<Entry>, nil` when an entry has been found, `nil, nil` when no entry was found and finally, `nil, <error>` if an error was encountered while fetching the singlet entry.

func (*FluxDB) ReadTabletAt

func (fdb *FluxDB) ReadTabletAt(
	ctx context.Context,
	height uint64,
	tablet Tablet,
	speculativeWrites []*WriteRequest,
) ([]TabletRow, error)

func (*FluxDB) ReadTabletIndexAt

func (fdb *FluxDB) ReadTabletIndexAt(ctx context.Context, tablet Tablet, height uint64) (*TabletIndex, error)

ReadTabletIndexAt returns the latest active index at the provided height. If there is index available at this height, this method returns `nil` as the index value.

func (*FluxDB) ReadTabletRowAt

func (fdb *FluxDB) ReadTabletRowAt(
	ctx context.Context,
	height uint64,
	tablet Tablet,
	primaryKey TabletRowPrimaryKey,
	speculativeWrites []*WriteRequest,
) (TabletRow, error)

func (*FluxDB) ReindexTablet

func (fdb *FluxDB) ReindexTablet(ctx context.Context, height uint64, tablet Tablet, write bool) (*TabletIndex, bool, error)

func (*FluxDB) ReindexTablets

func (fdb *FluxDB) ReindexTablets(ctx context.Context, height uint64, lowerBound Tablet, dryRun bool) (tabletCount int, indexCount int, err error)

func (*FluxDB) SetIgnoreIndexRange

func (fdb *FluxDB) SetIgnoreIndexRange(startBlock, stopBlock uint64)

func (*FluxDB) SetReady

func (fdb *FluxDB) SetReady()

SetReady marks the process as ready, meaning it has crossed the "close to real-time" threshold.

func (*FluxDB) SetSharding

func (fdb *FluxDB) SetSharding(shardIndex, shardCount int)

func (*FluxDB) SetStopBlock

func (fdb *FluxDB) SetStopBlock(stopBlock uint64)

func (*FluxDB) VerifyAllShardsWritten

func (fdb *FluxDB) VerifyAllShardsWritten(ctx context.Context) (*shardProgressStats, error)

func (*FluxDB) WriteBatch

func (fdb *FluxDB) WriteBatch(ctx context.Context, w []*WriteRequest) error

func (*FluxDB) WriteShardingFinalCheckpoint

func (fdb *FluxDB) WriteShardingFinalCheckpoint(ctx context.Context, height uint64, block bstream.BlockRef) error

type FluxDBHandler

type FluxDBHandler struct {
	// contains filtered or unexported fields
}

FluxDBHandler is a pipeline that writes in FluxDB

func NewHandler

func NewHandler(db *FluxDB) *FluxDBHandler

func (*FluxDBHandler) EnableWriteOnEachIrreversibleStep

func (p *FluxDBHandler) EnableWriteOnEachIrreversibleStep()

func (*FluxDBHandler) EnableWrites

func (p *FluxDBHandler) EnableWrites()

func (*FluxDBHandler) FetchSpeculativeWrites

func (p *FluxDBHandler) FetchSpeculativeWrites(ctx context.Context, headBlockID string, upToHeight uint64) (speculativeWrites []*WriteRequest)

func (*FluxDBHandler) HeadBlock

func (p *FluxDBHandler) HeadBlock(ctx context.Context) bstream.BlockRef

func (*FluxDBHandler) InitializeStartBlockID

func (p *FluxDBHandler) InitializeStartBlockID() (startBlock bstream.BlockRef, err error)

func (*FluxDBHandler) ProcessBlock

func (p *FluxDBHandler) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error

type Key

type Key []byte

func (Key) String

func (k Key) String() string

type ShardInjector

type ShardInjector struct {
	*shutter.Shutter
	// contains filtered or unexported fields
}

func NewShardInjector

func NewShardInjector(shardsStore dstore.Store, db *FluxDB) *ShardInjector

func (*ShardInjector) Run

func (s *ShardInjector) Run() (err error)

type Sharder

type Sharder struct {
	// contains filtered or unexported fields
}

func NewSharder

func NewSharder(shardsStore dstore.Store, scratchDirectory string, shardCount int, startBlock, stopBlock uint64) (*Sharder, error)

func (*Sharder) ProcessBlock

func (s *Sharder) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error

type Singlet

type Singlet interface {
	// Collection to which this singlet is bound to.
	Collection() uint16

	// Identifier uniquely representing this Singlet instance within its collection.
	Identifier() []byte

	// Entry constructs a concrete SingletEntry implementation for this Singlet
	// at the specified height and associated to this value bytes. The entry usually
	// unmarshals the value and ensure it's valid.
	Entry(height uint64, value []byte) (SingletEntry, error)

	// String should turn the Singlet into a human readable form. If the
	// identifier is composed of multiple part, use `:` to delimit them in the string.
	//
	// You **should** add the collection name to the singlet identifier, it ease recognition
	// of which singlet collection you are seeing.
	//
	// This method is used by the various `Key*#String` helpers, they all expect this implementation
	// to have the collection name appended to them, so you should respect this soft constraint
	// and the `:` delimiter.
	String() string
}

Singlet is a height-aware container for a single piece of information, for example an account's balance.

A Singlet always contain a single row key but stored at any height.

func NewSinglet

func NewSinglet(singletKey []byte) (singlet Singlet, err error)

NewSinglet constructs a new Singlet implementation from the singlet key as stored in the underlying storage engine.

The key received here will always contain the collection prefix (2 bytes) as well as enough bytes to contain the singlet identifier.

**Important** The received key might contain more bytes, like the reminder of a full entry key, all code paths called here must not expected a maximum length and only deal with just enough bytes to read the data needed from the key.

type SingletEntry

type SingletEntry interface {
	Singlet() Singlet
	Height() uint64
	IsDeletion() bool

	MarshalValue() ([]byte, error)

	String() string
}

func NewSingletEntry

func NewSingletEntry(singlet Singlet, key []byte, value []byte) (SingletEntry, error)

func NewSingletEntryFromStorage

func NewSingletEntryFromStorage(key []byte, value []byte) (SingletEntry, error)

type SingletEntryKey

type SingletEntryKey []byte

SingletEntryKey represents a fully well-formed singlet entry key as written to the underlying storage engine. This key is always represented in the same but variable format:

``` <collection (2 bytes)><singlet identifier (N bytes)><height (8 bytes)> ```

Only the singlet implementation knows how to turn its series of bytes into the correct implementation.

func KeyForSingletEntry

func KeyForSingletEntry(entry SingletEntry) (out SingletEntryKey)

func (SingletEntryKey) String

func (k SingletEntryKey) String() string

type SingletFactory

type SingletFactory func(identifier []byte) (Singlet, error)

SingletFactory accepts a singlet identifier bytes and convert it into a valid Singlet concrete implementation.

The received identifier value **never* contains the collection prefix, so you can start parsing the key right away.

**Important** The amout of bytes received in the identifier could be bigger than the actual amount required to form a valid singlet identifier (i.e. len(identifier) != len(singlet.Identifier())). Your factory implementation must not check maximal length and instead deal with a minimal length. This way, it's easy to create a singlet instance from a full key as stored in the underlying engine.

type SingletKey

type SingletKey []byte

SingletKey represents the storage key for a Singlet, contains the collection as well as the Singlet's identifier in byte form.

func KeyForSinglet

func KeyForSinglet(singlet Singlet) (out SingletKey)

func (SingletKey) String

func (k SingletKey) String() string

type SingletKeyAt

type SingletKeyAt []byte

SingletKeyAt represents the storage key for a Singlet at a given height, contains the collection, the Singlet's identifier as well as the height in byte form.

func KeyForSingletAt

func KeyForSingletAt(singlet Singlet, height uint64) (out SingletKeyAt)

func (SingletKeyAt) String

func (k SingletKeyAt) String() string

type Tablet

type Tablet interface {
	// Collection to which this tablet is bound to.
	Collection() uint16

	// Identifier uniquely representing this Tablet instance within its collection.
	Identifier() []byte

	// Row constructs a concrete TabletRow implementation for this Tablet
	// at the specified height, for the given primary key associated to this value
	// bytes. The row usually unmarshals the value and ensure it's valid.
	Row(height uint64, primaryKey []byte, value []byte) (TabletRow, error)

	// String should turn the Tablet into a human readable form. If the
	// identifier is composed of multiple part, use `:` to delimit them in the string.
	//
	// You **should** add the collection name to the tablet identifier, it ease recognition
	// of which tablet collection you are seeing.
	//
	// This method is used by the various `Key*#String` helpers, they all expect this implementation
	// to have the collection name appended to them, so you should respect this soft constraint
	// and the `:` delimiter.
	String() string
}

Tablet is a height-aware temporal table containing all the rows at any given height. Let's assume you have a token contract where the token and there is multiple accounts owning this token. You could track the historical values of balances at any height using a Tablet implementation. The tablet key would be `<contract>:<token>` while the rows would be each of the account owning the token. The primary key of the row would be the account while the value stored in the row would be the balance.

By using the Tablet implementation and fluxdb library, you would then be able to retrieve, at any height, all accounts and their respective balance.

A Tablet always contain 0 to N rows, we maintain the state of each row independently. If a row mutates each height, we will have a total B versions of this exact row in the database, B being the total count of heights seen so far.

In lots of blockchain system, the height is simply the block number value.

func NewTablet

func NewTablet(tabletKey []byte) (tablet Tablet, err error)

NewTablet constructs a new Tablet implementation from the tablet key as stored in the underlying storage engine.

The key received here will always contain the collection prefix (2 bytes) as well as enough bytes to contain the tablet identifier.

**Important** The received key might contain more bytes, like the reminder of a full row key, all code paths called here must not expected a maximum length and only deal with just enough bytes to read the data needed from the key.

type TabletFactory

type TabletFactory func(identifier []byte) (Tablet, error)

TabletFactory accepts a tablet identifier bytes and convert it into a valid Tablet concrete implementation.

The received identifier value **never* contains the collection prefix, so you can start parsing the key right away.

**Important** The amout of bytes received in the identifier could be bigger than the actual amount required to form a valid tablet identifier (i.e. len(identifier) != len(tablet.Identifier())). Your factory implementation must not check maximal length and instead deal with a minimal length. This way, it's easy to create a tablet instance from a full key as stored in the underlying engine.

type TabletIndex

type TabletIndex struct {
	AtHeight           uint64
	SquelchCount       uint64
	PrimaryKeyToHeight *primaryKeyToHeightMap
}

func NewTabletIndex

func NewTabletIndex() *TabletIndex

func (*TabletIndex) MarshalValue

func (i *TabletIndex) MarshalValue() ([]byte, error)

func (*TabletIndex) RowCount

func (i *TabletIndex) RowCount() uint64

func (*TabletIndex) Rows

func (i *TabletIndex) Rows(tablet Tablet) (rows []TabletRow, err error)

Rows return all the rows contained in the index for this tablet without actually hydrating the value of each row (which means that row retrieved using this method will all have `nil` as their value).

This is usef mainly for printing purposes.

type TabletKey

type TabletKey []byte

TabletKey represents the storage key for a Tablet, contains the collection as well as the Tablet's identifier in byte form.

func KeyForTablet

func KeyForTablet(tablet Tablet) (out TabletKey)

func (TabletKey) String

func (k TabletKey) String() string

type TabletKeyAt

type TabletKeyAt []byte

TabletKeyAt represents the storage key for a Tablet at a given height, contains the collection, the Tablet's identifier as well as the height in byte form.

func KeyForTabletAt

func KeyForTabletAt(tablet Tablet, height uint64) (out TabletKeyAt)

func (TabletKeyAt) String

func (k TabletKeyAt) String() string

type TabletRow

type TabletRow interface {
	Tablet() Tablet
	Height() uint64
	PrimaryKey() []byte
	IsDeletion() bool

	MarshalValue() ([]byte, error)

	String() string
}

func NewTabletRow

func NewTabletRow(tablet Tablet, key []byte, value []byte) (TabletRow, error)

NewTabletRow constructs a new TabletRow implementation from the key as stored in the underlying storage engine.

The key received here will always contain the collection prefix (2 bytes) as well as enough bytes to contain the tablet identifier, the height and the primary key.

func NewTabletRowFromStorage

func NewTabletRowFromStorage(key []byte, value []byte) (TabletRow, error)

type TabletRowKey

type TabletRowKey []byte

TabletRowKey represents a fully well-formed tablet row key as written to the underlying storage engine. This key is always represented in the same but variable format:

``` <collection (2 bytes)><tablet identifier (N bytes)><height (8 bytes)><row primary key (N bytes)> ```

Only the tablet implementation knows how to turn its series of bytes into the correct implementation.

func KeyForTabletRow

func KeyForTabletRow(row TabletRow) (out TabletRowKey)

func KeyForTabletRowFromParts

func KeyForTabletRowFromParts(tablet Tablet, height uint64, primaryKey []byte) (out TabletRowKey)

func (TabletRowKey) String

func (k TabletRowKey) String() string

type TabletRowPrimaryKey

type TabletRowPrimaryKey interface {
	Bytes() []byte
	String() string
}

TabletRowPrimaryKey represents a specific primary key for a given TabletRow. It's used mainly for reading a specific tablet row providing easy human readable version for the concrete type of TabletRow.

type WriteRequest

type WriteRequest struct {
	SingletEntries []SingletEntry
	TabletRows     []TabletRow

	Height   uint64
	BlockRef bstream.BlockRef
}

func NewWriteRequestFromProto

func NewWriteRequestFromProto(request *pbfluxdb.WriteRequest) (*WriteRequest, error)

func ReadShard

func ReadShard(reader io.Reader, startAfter uint64) ([]*WriteRequest, error)

func (*WriteRequest) AppendSingletEntry

func (r *WriteRequest) AppendSingletEntry(entry SingletEntry)

func (*WriteRequest) AppendTabletRow

func (r *WriteRequest) AppendTabletRow(row TabletRow)

func (*WriteRequest) ToProto

func (r *WriteRequest) ToProto() (*pbfluxdb.WriteRequest, error)

Directories

Path Synopsis
app
kv

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL