Documentation ¶
Index ¶
- Variables
- func BuildReprocessingPipeline(blockFilter func(blk *bstream.Block) error, blockMapper BlockMapper, ...) (bstream.Source, error)
- func ErrInvalidKeyLength(tag string, expected, actual int) error
- func ErrInvalidKeyLengthAtLeast(tag string, expected, actual int) error
- func NewKVStore(dsnString string) (store.KVStore, error)
- func NewPreprocessBlock(mapper BlockMapper) bstream.PreprocessFunc
- func RegisterSingletFactory(collection uint16, collectionName string, factory SingletFactory)
- func RegisterTabletFactory(collection uint16, collectionName string, factory TabletFactory)
- func SingletEqual(left, right Singlet) bool
- func TabletEqual(left, right Tablet) bool
- type BaseSingletEntry
- type BaseTabletRow
- func (b BaseTabletRow) Height() uint64
- func (b BaseTabletRow) IsDeletion() bool
- func (b BaseTabletRow) MarshalValue() ([]byte, error)
- func (b BaseTabletRow) PrimaryKey() []byte
- func (b BaseTabletRow) Stringify(primaryKey string) string
- func (b BaseTabletRow) Tablet() Tablet
- func (b BaseTabletRow) Value() []byte
- type BlockMapper
- type Collection
- type FluxDB
- func (fdb *FluxDB) BuildPipeline(blockMeta pbblockmeta.BlockIDClient, ...)
- func (fdb *FluxDB) CheckCleanDBForSharding() error
- func (fdb *FluxDB) Close() error
- func (fdb *FluxDB) DeleteAllShardCheckpoints(ctx context.Context) error
- func (fdb *FluxDB) FetchLastWrittenCheckpoint(ctx context.Context) (height uint64, block bstream.BlockRef, err error)
- func (fdb *FluxDB) HasSeenAnyRowForTablet(ctx context.Context, tablet Tablet) (exists bool, err error)
- func (fdb *FluxDB) IndexTables(ctx context.Context) error
- func (fdb *FluxDB) IsReady() bool
- func (fdb *FluxDB) IsSharding() bool
- func (fdb *FluxDB) Launch(disablePipeline bool)
- func (fdb *FluxDB) PruneTabletIndexes(ctx context.Context, pruneFrequency int, height uint64, lowerBound Tablet, ...) (tabletCount int, indexCount int, deletedIndexCount int, err error)
- func (fdb *FluxDB) ReadSingletEntryAt(ctx context.Context, singlet Singlet, height uint64, ...) (SingletEntry, error)
- func (fdb *FluxDB) ReadTabletAt(ctx context.Context, height uint64, tablet Tablet, ...) ([]TabletRow, error)
- func (fdb *FluxDB) ReadTabletIndexAt(ctx context.Context, tablet Tablet, height uint64) (*TabletIndex, error)
- func (fdb *FluxDB) ReadTabletRowAt(ctx context.Context, height uint64, tablet Tablet, ...) (TabletRow, error)
- func (fdb *FluxDB) ReindexTablet(ctx context.Context, height uint64, tablet Tablet, write bool) (*TabletIndex, bool, error)
- func (fdb *FluxDB) ReindexTablets(ctx context.Context, height uint64, lowerBound Tablet, dryRun bool) (tabletCount int, indexCount int, err error)
- func (fdb *FluxDB) SetIgnoreIndexRange(startBlock, stopBlock uint64)
- func (fdb *FluxDB) SetReady()
- func (fdb *FluxDB) SetSharding(shardIndex, shardCount int)
- func (fdb *FluxDB) SetStopBlock(stopBlock uint64)
- func (fdb *FluxDB) VerifyAllShardsWritten(ctx context.Context) (*shardProgressStats, error)
- func (fdb *FluxDB) WriteBatch(ctx context.Context, w []*WriteRequest) error
- func (fdb *FluxDB) WriteShardingFinalCheckpoint(ctx context.Context, height uint64, block bstream.BlockRef) error
- type FluxDBHandler
- func (p *FluxDBHandler) EnableWriteOnEachIrreversibleStep()
- func (p *FluxDBHandler) EnableWrites()
- func (p *FluxDBHandler) FetchSpeculativeWrites(ctx context.Context, headBlockID string, upToHeight uint64) (speculativeWrites []*WriteRequest)
- func (p *FluxDBHandler) HeadBlock(ctx context.Context) bstream.BlockRef
- func (p *FluxDBHandler) InitializeStartBlockID() (startBlock bstream.BlockRef, err error)
- func (p *FluxDBHandler) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error
- type Key
- type ShardInjector
- type Sharder
- type Singlet
- type SingletEntry
- type SingletEntryKey
- type SingletFactory
- type SingletKey
- type SingletKeyAt
- type Tablet
- type TabletFactory
- type TabletIndex
- type TabletKey
- type TabletKeyAt
- type TabletRow
- type TabletRowKey
- type TabletRowPrimaryKey
- type WriteRequest
Constants ¶
This section is empty.
Variables ¶
var ErrCleanSourceStop = errors.New("clean source stop")
Functions ¶
func BuildReprocessingPipeline ¶
func BuildReprocessingPipeline( blockFilter func(blk *bstream.Block) error, blockMapper BlockMapper, blockMeta pbblockmeta.BlockIDClient, startBlockResolver bstream.StartBlockResolver, handler bstream.Handler, blocksStore dstore.Store, startHeight uint64, ) (bstream.Source, error)
func ErrInvalidKeyLength ¶
func NewKVStore ¶
NewKVStore creates the underlying KV store engine base on the DSN string received.
This exists in `fluxdb` package since it's shared between `app` and `cmd` packages.
func NewPreprocessBlock ¶
func NewPreprocessBlock(mapper BlockMapper) bstream.PreprocessFunc
func RegisterSingletFactory ¶
func RegisterSingletFactory(collection uint16, collectionName string, factory SingletFactory)
RegisterSingletFactory accepts a collection (and its name) as well as a SingletFactory for this Singlet type and register it in the system so it's known to FluxDB internal components.
func RegisterTabletFactory ¶
func RegisterTabletFactory(collection uint16, collectionName string, factory TabletFactory)
RegisterSingletFactory accepts a collection (and its name) as well as a TabletFactory for this Tablet type and register it in the system so it's known to FluxDB internal components.
func SingletEqual ¶
SingletEqual returns wheter two Singlet instances are considered equal by comparing their respective collection and identifier.
Two nil values are considered equal.
func TabletEqual ¶
TabletEqual returns wheter two Tablet instances are considered equal by comparing their respective collection and identifier.
Two nil values are considered equal.
Types ¶
type BaseSingletEntry ¶
type BaseSingletEntry struct {
// contains filtered or unexported fields
}
func NewBaseSingletEntry ¶
func NewBaseSingletEntry(singlet Singlet, height uint64, value []byte) (out BaseSingletEntry)
func (BaseSingletEntry) Height ¶
func (b BaseSingletEntry) Height() uint64
func (BaseSingletEntry) IsDeletion ¶
func (b BaseSingletEntry) IsDeletion() bool
func (BaseSingletEntry) MarshalValue ¶
func (b BaseSingletEntry) MarshalValue() ([]byte, error)
func (BaseSingletEntry) Singlet ¶
func (b BaseSingletEntry) Singlet() Singlet
func (BaseSingletEntry) String ¶
func (b BaseSingletEntry) String() string
func (BaseSingletEntry) Value ¶
func (b BaseSingletEntry) Value() []byte
type BaseTabletRow ¶
type BaseTabletRow struct {
// contains filtered or unexported fields
}
func NewBaseTabletRow ¶
func NewBaseTabletRow(tablet Tablet, height uint64, primaryKey []byte, value []byte) (out BaseTabletRow)
func (BaseTabletRow) Height ¶
func (b BaseTabletRow) Height() uint64
func (BaseTabletRow) IsDeletion ¶
func (b BaseTabletRow) IsDeletion() bool
func (BaseTabletRow) MarshalValue ¶
func (b BaseTabletRow) MarshalValue() ([]byte, error)
func (BaseTabletRow) PrimaryKey ¶
func (b BaseTabletRow) PrimaryKey() []byte
func (BaseTabletRow) Stringify ¶
func (b BaseTabletRow) Stringify(primaryKey string) string
func (BaseTabletRow) Tablet ¶
func (b BaseTabletRow) Tablet() Tablet
func (BaseTabletRow) Value ¶
func (b BaseTabletRow) Value() []byte
type BlockMapper ¶
type BlockMapper interface {
Map(rawBlk *bstream.Block) (*WriteRequest, error)
}
type Collection ¶
type FluxDB ¶
type FluxDB struct { *shutter.Shutter SpeculativeWritesFetcher func(ctx context.Context, headBlockID string, upToHeight uint64) (speculativeWrites []*WriteRequest) HeadBlock func(ctx context.Context) bstream.BlockRef // contains filtered or unexported fields }
func (*FluxDB) BuildPipeline ¶
func (fdb *FluxDB) BuildPipeline( blockMeta pbblockmeta.BlockIDClient, getBlockID bstream.EternalSourceStartBackAtBlock, handler bstream.Handler, blocksStore dstore.Store, blockStreamAddr string, )
func (*FluxDB) CheckCleanDBForSharding ¶
func (*FluxDB) DeleteAllShardCheckpoints ¶
func (*FluxDB) FetchLastWrittenCheckpoint ¶
func (*FluxDB) HasSeenAnyRowForTablet ¶
func (*FluxDB) IsSharding ¶
func (*FluxDB) PruneTabletIndexes ¶
func (*FluxDB) ReadSingletEntryAt ¶
func (fdb *FluxDB) ReadSingletEntryAt( ctx context.Context, singlet Singlet, height uint64, speculativeWrites []*WriteRequest, ) (SingletEntry, error)
ReadSingletEntryAt query the storage engine returning the active singlet entry value at specified height.
Returns `<Entry>, nil` when an entry has been found, `nil, nil` when no entry was found and finally, `nil, <error>` if an error was encountered while fetching the singlet entry.
func (*FluxDB) ReadTabletAt ¶
func (*FluxDB) ReadTabletIndexAt ¶
func (fdb *FluxDB) ReadTabletIndexAt(ctx context.Context, tablet Tablet, height uint64) (*TabletIndex, error)
ReadTabletIndexAt returns the latest active index at the provided height. If there is index available at this height, this method returns `nil` as the index value.
func (*FluxDB) ReadTabletRowAt ¶
func (fdb *FluxDB) ReadTabletRowAt( ctx context.Context, height uint64, tablet Tablet, primaryKey TabletRowPrimaryKey, speculativeWrites []*WriteRequest, ) (TabletRow, error)
func (*FluxDB) ReindexTablet ¶
func (*FluxDB) ReindexTablets ¶
func (*FluxDB) SetIgnoreIndexRange ¶
func (*FluxDB) SetReady ¶
func (fdb *FluxDB) SetReady()
SetReady marks the process as ready, meaning it has crossed the "close to real-time" threshold.
func (*FluxDB) SetSharding ¶
func (*FluxDB) SetStopBlock ¶
func (*FluxDB) VerifyAllShardsWritten ¶
func (*FluxDB) WriteBatch ¶
func (fdb *FluxDB) WriteBatch(ctx context.Context, w []*WriteRequest) error
type FluxDBHandler ¶
type FluxDBHandler struct {
// contains filtered or unexported fields
}
FluxDBHandler is a pipeline that writes in FluxDB
func NewHandler ¶
func NewHandler(db *FluxDB) *FluxDBHandler
func (*FluxDBHandler) EnableWriteOnEachIrreversibleStep ¶
func (p *FluxDBHandler) EnableWriteOnEachIrreversibleStep()
func (*FluxDBHandler) EnableWrites ¶
func (p *FluxDBHandler) EnableWrites()
func (*FluxDBHandler) FetchSpeculativeWrites ¶
func (p *FluxDBHandler) FetchSpeculativeWrites(ctx context.Context, headBlockID string, upToHeight uint64) (speculativeWrites []*WriteRequest)
func (*FluxDBHandler) HeadBlock ¶
func (p *FluxDBHandler) HeadBlock(ctx context.Context) bstream.BlockRef
func (*FluxDBHandler) InitializeStartBlockID ¶
func (p *FluxDBHandler) InitializeStartBlockID() (startBlock bstream.BlockRef, err error)
func (*FluxDBHandler) ProcessBlock ¶
func (p *FluxDBHandler) ProcessBlock(rawBlk *bstream.Block, rawObj interface{}) error
type ShardInjector ¶
func NewShardInjector ¶
func NewShardInjector(shardsStore dstore.Store, db *FluxDB) *ShardInjector
func (*ShardInjector) Run ¶
func (s *ShardInjector) Run() (err error)
type Singlet ¶
type Singlet interface { // Collection to which this singlet is bound to. Collection() uint16 // Identifier uniquely representing this Singlet instance within its collection. Identifier() []byte // Entry constructs a concrete SingletEntry implementation for this Singlet // at the specified height and associated to this value bytes. The entry usually // unmarshals the value and ensure it's valid. Entry(height uint64, value []byte) (SingletEntry, error) // String should turn the Singlet into a human readable form. If the // identifier is composed of multiple part, use `:` to delimit them in the string. // // You **should** add the collection name to the singlet identifier, it ease recognition // of which singlet collection you are seeing. // // This method is used by the various `Key*#String` helpers, they all expect this implementation // to have the collection name appended to them, so you should respect this soft constraint // and the `:` delimiter. String() string }
Singlet is a height-aware container for a single piece of information, for example an account's balance.
A Singlet always contain a single row key but stored at any height.
func NewSinglet ¶
NewSinglet constructs a new Singlet implementation from the singlet key as stored in the underlying storage engine.
The key received here will always contain the collection prefix (2 bytes) as well as enough bytes to contain the singlet identifier.
**Important** The received key might contain more bytes, like the reminder of a full entry key, all code paths called here must not expected a maximum length and only deal with just enough bytes to read the data needed from the key.
type SingletEntry ¶
type SingletEntry interface { Singlet() Singlet Height() uint64 IsDeletion() bool MarshalValue() ([]byte, error) String() string }
func NewSingletEntry ¶
func NewSingletEntry(singlet Singlet, key []byte, value []byte) (SingletEntry, error)
func NewSingletEntryFromStorage ¶
func NewSingletEntryFromStorage(key []byte, value []byte) (SingletEntry, error)
type SingletEntryKey ¶
type SingletEntryKey []byte
SingletEntryKey represents a fully well-formed singlet entry key as written to the underlying storage engine. This key is always represented in the same but variable format:
``` <collection (2 bytes)><singlet identifier (N bytes)><height (8 bytes)> ```
Only the singlet implementation knows how to turn its series of bytes into the correct implementation.
func KeyForSingletEntry ¶
func KeyForSingletEntry(entry SingletEntry) (out SingletEntryKey)
func (SingletEntryKey) String ¶
func (k SingletEntryKey) String() string
type SingletFactory ¶
SingletFactory accepts a singlet identifier bytes and convert it into a valid Singlet concrete implementation.
The received identifier value **never* contains the collection prefix, so you can start parsing the key right away.
**Important** The amout of bytes received in the identifier could be bigger than the actual amount required to form a valid singlet identifier (i.e. len(identifier) != len(singlet.Identifier())). Your factory implementation must not check maximal length and instead deal with a minimal length. This way, it's easy to create a singlet instance from a full key as stored in the underlying engine.
type SingletKey ¶
type SingletKey []byte
SingletKey represents the storage key for a Singlet, contains the collection as well as the Singlet's identifier in byte form.
func KeyForSinglet ¶
func KeyForSinglet(singlet Singlet) (out SingletKey)
func (SingletKey) String ¶
func (k SingletKey) String() string
type SingletKeyAt ¶
type SingletKeyAt []byte
SingletKeyAt represents the storage key for a Singlet at a given height, contains the collection, the Singlet's identifier as well as the height in byte form.
func KeyForSingletAt ¶
func KeyForSingletAt(singlet Singlet, height uint64) (out SingletKeyAt)
func (SingletKeyAt) String ¶
func (k SingletKeyAt) String() string
type Tablet ¶
type Tablet interface { // Collection to which this tablet is bound to. Collection() uint16 // Identifier uniquely representing this Tablet instance within its collection. Identifier() []byte // Row constructs a concrete TabletRow implementation for this Tablet // at the specified height, for the given primary key associated to this value // bytes. The row usually unmarshals the value and ensure it's valid. Row(height uint64, primaryKey []byte, value []byte) (TabletRow, error) // String should turn the Tablet into a human readable form. If the // identifier is composed of multiple part, use `:` to delimit them in the string. // // You **should** add the collection name to the tablet identifier, it ease recognition // of which tablet collection you are seeing. // // This method is used by the various `Key*#String` helpers, they all expect this implementation // to have the collection name appended to them, so you should respect this soft constraint // and the `:` delimiter. String() string }
Tablet is a height-aware temporal table containing all the rows at any given height. Let's assume you have a token contract where the token and there is multiple accounts owning this token. You could track the historical values of balances at any height using a Tablet implementation. The tablet key would be `<contract>:<token>` while the rows would be each of the account owning the token. The primary key of the row would be the account while the value stored in the row would be the balance.
By using the Tablet implementation and fluxdb library, you would then be able to retrieve, at any height, all accounts and their respective balance.
A Tablet always contain 0 to N rows, we maintain the state of each row independently. If a row mutates each height, we will have a total B versions of this exact row in the database, B being the total count of heights seen so far.
In lots of blockchain system, the height is simply the block number value.
func NewTablet ¶
NewTablet constructs a new Tablet implementation from the tablet key as stored in the underlying storage engine.
The key received here will always contain the collection prefix (2 bytes) as well as enough bytes to contain the tablet identifier.
**Important** The received key might contain more bytes, like the reminder of a full row key, all code paths called here must not expected a maximum length and only deal with just enough bytes to read the data needed from the key.
type TabletFactory ¶
TabletFactory accepts a tablet identifier bytes and convert it into a valid Tablet concrete implementation.
The received identifier value **never* contains the collection prefix, so you can start parsing the key right away.
**Important** The amout of bytes received in the identifier could be bigger than the actual amount required to form a valid tablet identifier (i.e. len(identifier) != len(tablet.Identifier())). Your factory implementation must not check maximal length and instead deal with a minimal length. This way, it's easy to create a tablet instance from a full key as stored in the underlying engine.
type TabletIndex ¶
type TabletIndex struct { AtHeight uint64 SquelchCount uint64 PrimaryKeyToHeight *primaryKeyToHeightMap }
func NewTabletIndex ¶
func NewTabletIndex() *TabletIndex
func (*TabletIndex) MarshalValue ¶
func (i *TabletIndex) MarshalValue() ([]byte, error)
func (*TabletIndex) RowCount ¶
func (i *TabletIndex) RowCount() uint64
func (*TabletIndex) Rows ¶
func (i *TabletIndex) Rows(tablet Tablet) (rows []TabletRow, err error)
Rows return all the rows contained in the index for this tablet without actually hydrating the value of each row (which means that row retrieved using this method will all have `nil` as their value).
This is usef mainly for printing purposes.
type TabletKey ¶
type TabletKey []byte
TabletKey represents the storage key for a Tablet, contains the collection as well as the Tablet's identifier in byte form.
func KeyForTablet ¶
type TabletKeyAt ¶
type TabletKeyAt []byte
TabletKeyAt represents the storage key for a Tablet at a given height, contains the collection, the Tablet's identifier as well as the height in byte form.
func KeyForTabletAt ¶
func KeyForTabletAt(tablet Tablet, height uint64) (out TabletKeyAt)
func (TabletKeyAt) String ¶
func (k TabletKeyAt) String() string
type TabletRow ¶
type TabletRow interface { Tablet() Tablet Height() uint64 PrimaryKey() []byte IsDeletion() bool MarshalValue() ([]byte, error) String() string }
func NewTabletRow ¶
NewTabletRow constructs a new TabletRow implementation from the key as stored in the underlying storage engine.
The key received here will always contain the collection prefix (2 bytes) as well as enough bytes to contain the tablet identifier, the height and the primary key.
type TabletRowKey ¶
type TabletRowKey []byte
TabletRowKey represents a fully well-formed tablet row key as written to the underlying storage engine. This key is always represented in the same but variable format:
``` <collection (2 bytes)><tablet identifier (N bytes)><height (8 bytes)><row primary key (N bytes)> ```
Only the tablet implementation knows how to turn its series of bytes into the correct implementation.
func KeyForTabletRow ¶
func KeyForTabletRow(row TabletRow) (out TabletRowKey)
func KeyForTabletRowFromParts ¶
func KeyForTabletRowFromParts(tablet Tablet, height uint64, primaryKey []byte) (out TabletRowKey)
func (TabletRowKey) String ¶
func (k TabletRowKey) String() string
type TabletRowPrimaryKey ¶
TabletRowPrimaryKey represents a specific primary key for a given TabletRow. It's used mainly for reading a specific tablet row providing easy human readable version for the concrete type of TabletRow.
type WriteRequest ¶
type WriteRequest struct { SingletEntries []SingletEntry TabletRows []TabletRow Height uint64 BlockRef bstream.BlockRef }
func NewWriteRequestFromProto ¶
func NewWriteRequestFromProto(request *pbfluxdb.WriteRequest) (*WriteRequest, error)
func (*WriteRequest) AppendSingletEntry ¶
func (r *WriteRequest) AppendSingletEntry(entry SingletEntry)
func (*WriteRequest) AppendTabletRow ¶
func (r *WriteRequest) AppendTabletRow(row TabletRow)
func (*WriteRequest) ToProto ¶
func (r *WriteRequest) ToProto() (*pbfluxdb.WriteRequest, error)