kv

package
v0.0.0-...-18aa2d9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const ReadersLimit = 32000 // MDBX_READERS_LIMIT=32767
View Source
const (
	Sequence = "Sequence" // tbl_name -> seq_u64
)
View Source
const Unlim int = -1

const Unbounded []byte = nil

Variables

View Source
var (
	// TODO(AD): Remove chaindata specific
	ErrAttemptToDeleteNonDeprecatedBucket = errors.New("only buckets from dbutils.ChaindataDeprecatedTables can be deleted")

	DbSize    = metrics.GetOrCreateGauge(`db_size`)    //nolint
	TxLimit   = metrics.GetOrCreateGauge(`tx_limit`)   //nolint
	TxSpill   = metrics.GetOrCreateGauge(`tx_spill`)   //nolint
	TxUnspill = metrics.GetOrCreateGauge(`tx_unspill`) //nolint
	TxDirty   = metrics.GetOrCreateGauge(`tx_dirty`)   //nolint

	DbCommitPreparation = metrics.GetOrCreateSummary(`db_commit_seconds{phase="preparation"}`) //nolint
	//DbGCWallClock       = metrics.GetOrCreateSummary(`db_commit_seconds{phase="gc_wall_clock"}`) //nolint
	//DbGCCpuTime         = metrics.GetOrCreateSummary(`db_commit_seconds{phase="gc_cpu_time"}`)   //nolint
	//DbCommitAudit       = metrics.GetOrCreateSummary(`db_commit_seconds{phase="audit"}`)         //nolint
	DbCommitWrite  = metrics.GetOrCreateSummary(`db_commit_seconds{phase="write"}`)  //nolint
	DbCommitSync   = metrics.GetOrCreateSummary(`db_commit_seconds{phase="sync"}`)   //nolint
	DbCommitEnding = metrics.GetOrCreateSummary(`db_commit_seconds{phase="ending"}`) //nolint
	DbCommitTotal  = metrics.GetOrCreateSummary(`db_commit_seconds{phase="total"}`)  //nolint

	DbPgopsNewly   = metrics.GetOrCreateGauge(`db_pgops{phase="newly"}`)   //nolint
	DbPgopsCow     = metrics.GetOrCreateGauge(`db_pgops{phase="cow"}`)     //nolint
	DbPgopsClone   = metrics.GetOrCreateGauge(`db_pgops{phase="clone"}`)   //nolint
	DbPgopsSplit   = metrics.GetOrCreateGauge(`db_pgops{phase="split"}`)   //nolint
	DbPgopsMerge   = metrics.GetOrCreateGauge(`db_pgops{phase="merge"}`)   //nolint
	DbPgopsSpill   = metrics.GetOrCreateGauge(`db_pgops{phase="spill"}`)   //nolint
	DbPgopsUnspill = metrics.GetOrCreateGauge(`db_pgops{phase="unspill"}`) //nolint
	DbPgopsWops    = metrics.GetOrCreateGauge(`db_pgops{phase="wops"}`)    //nolint

	GcLeafMetric     = metrics.GetOrCreateGauge(`db_gc_leaf`)     //nolint
	GcOverflowMetric = metrics.GetOrCreateGauge(`db_gc_overflow`) //nolint
	GcPagesMetric    = metrics.GetOrCreateGauge(`db_gc_pages`)    //nolint

)
View Source
var DefaultTableCfg = TableCfg{
	Sequence: {},
}
View Source
var ErrChanged = fmt.Errorf("key must not change")

Functions

func BigChunks

func BigChunks(db RoDB, table string, from []byte, walker func(tx Tx, k, v []byte) (bool, error)) error

BigChunks - read `table` by big chunks - restart read transaction after each 1 minutes

func DefaultPageSize

func DefaultPageSize() uint64

func EnsureNotChangedBool

func EnsureNotChangedBool(tx GetPut, bucket string, k []byte, value bool) (ok, enabled bool, err error)

EnsureNotChangedBool - used to store immutable config flags in db. protects from human mistakes

func FirstKey

func FirstKey(tx Tx, table string) ([]byte, error)

FirstKey - candidate on move to kv.Tx interface

func GetBool

func GetBool(tx Getter, bucket string, k []byte) (enabled bool, err error)

func LastKey

func LastKey(tx Tx, table string) ([]byte, error)

LastKey - candidate on move to kv.Tx interface

func NextSubtree

func NextSubtree(in []byte) ([]byte, bool)

NextSubtree does []byte++. Returns false if overflow.

func ReadAhead

func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string, from []byte, amount uint32) (clean func())

Types

type Bucket

type Bucket string

type BucketMigrator

type BucketMigrator interface {
	BucketMigratorRO
	DropBucket(string) error
	CreateBucket(string) error
	ExistsBucket(string) (bool, error)
	ClearBucket(string) error
}

BucketMigrator used for buckets migration, don't use it in usual app code

type BucketMigratorRO

type BucketMigratorRO interface {
	ListBuckets() ([]string, error)
}

type Closer

type Closer interface {
	Close()
}

type CmpFunc

type CmpFunc func(k1, k2, v1, v2 []byte) int

type Cursor

type Cursor interface {
	First() ([]byte, []byte, error)               // First - position at first key/data item
	Seek(seek []byte) ([]byte, []byte, error)     // Seek - position at first key greater than or equal to specified key
	SeekExact(key []byte) ([]byte, []byte, error) // SeekExact - position at exact matching key if exists
	Next() ([]byte, []byte, error)                // Next - position at next key/value (can iterate over DupSort key/values automatically)
	Prev() ([]byte, []byte, error)                // Prev - position at previous key
	Last() ([]byte, []byte, error)                // Last - position at last key and last possible value
	Current() ([]byte, []byte, error)             // Current - return key/data at current cursor position

	Count() (uint64, error) // Count - fast way to calculate amount of keys in bucket. It counts all keys even if Prefix was set.

	Close()
}

Cursor - class for navigating through a database CursorDupSort are inherit this class

If methods (like First/Next/Seek) return error, then returned key SHOULD not be nil (can be []byte{} for example). Then looping code will look as: c := kv.Cursor(bucketName)

for k, v, err := c.First(); k != nil; k, v, err = c.Next() {
   if err != nil {
       return err
   }
   ... logic
}

type CursorDupSort

type CursorDupSort interface {
	Cursor

	// SeekBothExact -
	// second parameter can be nil only if searched key has no duplicates, or return error
	SeekBothExact(key, value []byte) ([]byte, []byte, error)
	SeekBothRange(key, value []byte) ([]byte, error) // SeekBothRange - exact match of the key, but range match of the value
	FirstDup() ([]byte, error)                       // FirstDup - position at first data item of current key
	NextDup() ([]byte, []byte, error)                // NextDup - position at next data item of current key
	NextNoDup() ([]byte, []byte, error)              // NextNoDup - position at first data item of next key
	PrevDup() ([]byte, []byte, error)
	PrevNoDup() ([]byte, []byte, error)
	LastDup() ([]byte, error) // LastDup - position at last data item of current key

	CountDuplicates() (uint64, error) // CountDuplicates - number of duplicates for the current key
}

CursorDupSort

Example:

for k, v, err = cursor.First(); k != nil; k, v, err = cursor.NextNoDup() {
	if err != nil {
		return err
	}
	for ; v != nil; _, v, err = cursor.NextDup() {
		if err != nil {
			return err
		}

	}
}

type DBI

type DBI uint

type DBVerbosityLvl

type DBVerbosityLvl int8

type Deleter

type Deleter interface {
	// Delete removes a single entry.
	Delete(table string, k []byte) error
}

Deleter wraps the database delete operations.

type GetPut

type GetPut interface {
	Getter
	Putter
}

type Getter

type Getter interface {
	Has

	// GetOne references a readonly section of memory that must not be accessed after txn has terminated
	GetOne(table string, key []byte) (val []byte, err error)

	// ForEach iterates over entries with keys greater or equal to fromPrefix.
	// walker is called for each eligible entry.
	// If walker returns an error:
	//   - implementations of local db - stop
	//   - implementations of remote db - do not handle this error and may finish (send all entries to client) before error happen.
	ForEach(table string, fromPrefix []byte, walker func(k, v []byte) error) error
	ForPrefix(table string, prefix []byte, walker func(k, v []byte) error) error
	ForAmount(table string, prefix []byte, amount uint32, walker func(k, v []byte) error) error
}

type Has

type Has interface {
	// Has indicates whether a key exists in the database.
	Has(table string, key []byte) (bool, error)
}

type Label

type Label string
const (
	InMem Label = "inMem"
)

type PendingMutations

type PendingMutations interface {
	StatelessRwTx
	// Flush all in-memory data into `tx`
	Flush(ctx context.Context, tx RwTx) error
	Close()
	BatchSize() int
}

PendingMutations in-memory storage of changes Later they can either be flushed to the database or abandon

type Putter

type Putter interface {
	// Put inserts or updates a single entry.
	Put(table string, k, v []byte) error
}

Putter wraps the database write operations.

type RoDB

type RoDB interface {
	Closer
	ReadOnly() bool
	View(ctx context.Context, f func(tx Tx) error) error

	// BeginRo - creates transaction
	// 	tx may be discarded by .Rollback() method
	//
	// A transaction and its cursors must only be used by a single
	// 	thread (not goroutine), and a thread may only have a single transaction at a time.
	//  It happen automatically by - because this method calls runtime.LockOSThread() inside (Rollback/Commit releases it)
	//  By this reason application code can't call runtime.UnlockOSThread() - it leads to undefined behavior.
	//
	// If this `parent` is non-NULL, the new transaction
	//	will be a nested transaction, with the transaction indicated by parent
	//	as its parent. Transactions may be nested to any level. A parent
	//	transaction and its cursors may not issue any other operations than
	//	Commit and Rollback while it has active child transactions.
	BeginRo(ctx context.Context) (Tx, error)
	AllTables() TableCfg
	PageSize() uint64

	// Pointer to the underlying C environment handle, if applicable (e.g. *C.MDBX_env)
	CHandle() unsafe.Pointer
}

RoDB - Read-only version of KV.

type RwCursor

type RwCursor interface {
	Cursor

	Put(k, v []byte) error           // Put - based on order
	Append(k []byte, v []byte) error // Append - append the given key/data pair to the end of the database. This option allows fast bulk loading when keys are already known to be in the correct order.
	Delete(k []byte) error           // Delete - short version of SeekExact+DeleteCurrent or SeekBothExact+DeleteCurrent

	// DeleteCurrent This function deletes the key/data pair to which the cursor refers.
	// This does not invalidate the cursor, so operations such as MDB_NEXT
	// can still be used on it.
	// Both MDB_NEXT and MDB_GET_CURRENT will return the same record after
	// this operation.
	DeleteCurrent() error
}

type RwCursorDupSort

type RwCursorDupSort interface {
	CursorDupSort
	RwCursor

	PutNoDupData(key, value []byte) error // PutNoDupData - inserts key without dupsort
	DeleteCurrentDuplicates() error       // DeleteCurrentDuplicates - deletes all of the data items for the current key
	DeleteExact(k1, k2 []byte) error      // DeleteExact - delete 1 value from given key
	AppendDup(key, value []byte) error    // AppendDup - same as Append, but for sorted dup data
}

type RwDB

type RwDB interface {
	RoDB

	Update(ctx context.Context, f func(tx RwTx) error) error
	UpdateNosync(ctx context.Context, f func(tx RwTx) error) error

	BeginRw(ctx context.Context) (RwTx, error)
	BeginRwNosync(ctx context.Context) (RwTx, error)
}

RwDB low-level database interface - main target is - to provide common abstraction over top of MDBX and RemoteKV.

Common pattern for short-living transactions:

 if err := db.View(ctx, func(tx ethdb.Tx) error {
    ... code which uses database in transaction
 }); err != nil {
		return err
}

Common pattern for long-living transactions:

tx, err := db.Begin()
if err != nil {
	return err
}
defer tx.Rollback()

... code which uses database in transaction

err := tx.Commit()
if err != nil {
	return err
}

type RwTx

type RwTx interface {
	Tx
	StatelessWriteTx
	BucketMigrator

	RwCursor(table string) (RwCursor, error)
	RwCursorDupSort(table string) (RwCursorDupSort, error)

	// CollectMetrics - does collect all DB-related and Tx-related metrics
	// this method exists only in RwTx to avoid concurrency
	CollectMetrics()
}

RwTx

WARNING:

  • RwTx is not threadsafe and may only be used in the goroutine that created it.
  • ReadOnly transactions do not lock goroutine to thread, RwTx does
  • User Can't call runtime.LockOSThread/runtime.UnlockOSThread in same goroutine until RwTx Commit/Rollback

type StatelessReadTx

type StatelessReadTx interface {
	Getter

	Commit() error // Commit all the operations of a transaction into the database.
	Rollback()     // Rollback - abandon all the operations of the transaction instead of saving them.

	// ReadSequence - allows to create a linear sequence of unique positive integers for each table.
	// Can be called for a read transaction to retrieve the current sequence value, and the increment must be zero.
	// Sequence changes become visible outside the current write transaction after it is committed, and discarded on abort.
	// Starts from 0.
	ReadSequence(table string) (uint64, error)
}

type StatelessRwTx

type StatelessRwTx interface {
	StatelessReadTx
	StatelessWriteTx
}

type StatelessWriteTx

type StatelessWriteTx interface {
	Putter
	Deleter

	/*
		// if need N id's:
		baseId, err := tx.IncrementSequence(bucket, N)
		if err != nil {
		   return err
		}
		for i := 0; i < N; i++ {    // if N == 0, it will work as expected
		    id := baseId + i
		    // use id
		}


		// or if need only 1 id:
		id, err := tx.IncrementSequence(bucket, 1)
		if err != nil {
		    return err
		}
		// use id
	*/
	IncrementSequence(table string, amount uint64) (uint64, error)
	Append(table string, k, v []byte) error
	AppendDup(table string, k, v []byte) error
}

type TableCfg

type TableCfg map[string]TableCfgItem

func MergeTableCfg

func MergeTableCfg(cfgs ...TableCfg) TableCfg

type TableCfgItem

type TableCfgItem struct {
	Flags TableFlags
	// AutoDupSortKeysConversion - enables some keys transformation - to change db layout without changing app code.
	// Use it wisely - it helps to do experiments with DB format faster, but better reduce amount of Magic in app.
	// If good DB format found, push app code to accept this format and then disable this property.
	AutoDupSortKeysConversion bool
	IsDeprecated              bool
	DBI                       DBI
	// DupFromLen - if user provide key of this length, then next transformation applied:
	// v = append(k[DupToLen:], v...)
	// k = k[:DupToLen]
	// And opposite at retrieval
	// Works only if AutoDupSortKeysConversion enabled
	DupFromLen int
	DupToLen   int
}

type TableFlags

type TableFlags uint
const (
	Default    TableFlags = 0x00
	ReverseKey TableFlags = 0x02
	DupSort    TableFlags = 0x04
	IntegerKey TableFlags = 0x08
	IntegerDup TableFlags = 0x20
	ReverseDup TableFlags = 0x40
)

type Tx

type Tx interface {
	StatelessReadTx
	BucketMigratorRO

	// ID returns the identifier associated with this transaction. For a
	// read-only transaction, this corresponds to the snapshot being read;
	// concurrent readers will frequently have the same transaction ID.
	ViewID() uint64

	// Cursor - creates cursor object on top of given bucket. Type of cursor - depends on bucket configuration.
	// If bucket was created with mdbx.DupSort flag, then cursor with interface CursorDupSort created
	// Otherwise - object of interface Cursor created
	//
	// Cursor, also provides a grain of magic - it can use a declarative configuration - and automatically break
	// long keys into DupSort key/values. See docs for `bucket.go:TableCfgItem`
	Cursor(table string) (Cursor, error)
	CursorDupSort(table string) (CursorDupSort, error) // CursorDupSort - can be used if bucket has mdbx.DupSort flag

	DBSize() (uint64, error)

	// Range [from, to)
	// Range(from, nil) means [from, EndOfTable)
	// Range(nil, to)   means [StartOfTable, to)
	Range(table string, fromPrefix, toPrefix []byte) (iter.KV, error)
	// Stream is like Range, but for requesting huge data (Example: full table scan). Client can't stop it.
	//Stream(table string, fromPrefix, toPrefix []byte) (iter.KV, error)
	// RangeAscend - like Range [from, to) but also allow pass Limit parameters
	// Limit -1 means Unlimited
	RangeAscend(table string, fromPrefix, toPrefix []byte, limit int) (iter.KV, error)
	//StreamAscend(table string, fromPrefix, toPrefix []byte, limit int) (iter.KV, error)
	// RangeDescend - is like Range [from, to), but expecing `from`<`to`
	// example: RangeDescend("Table", "B", "A", -1)
	RangeDescend(table string, fromPrefix, toPrefix []byte, limit int) (iter.KV, error)
	//StreamDescend(table string, fromPrefix, toPrefix []byte, limit int) (iter.KV, error)
	// Prefix - is exactly Range(Table, prefix, kv.NextSubtree(prefix))
	Prefix(table string, prefix []byte) (iter.KV, error)

	// RangeDupSort - like Range but for fixed single key and iterating over range of values
	RangeDupSort(table string, key []byte, fromPrefix, toPrefix []byte, asc order.By, limit int) (iter.KV, error)

	ForEach(table string, fromPrefix []byte, walker func(k, v []byte) error) error
	ForPrefix(table string, prefix []byte, walker func(k, v []byte) error) error
	ForAmount(table string, prefix []byte, amount uint32, walker func(k, v []byte) error) error

	// Pointer to the underlying C transaction handle (e.g. *C.MDBX_txn)
	CHandle() unsafe.Pointer
	BucketSize(table string) (uint64, error)
}

Tx WARNING:

  • Tx is not threadsafe and may only be used in the goroutine that created it
  • ReadOnly transactions do not lock goroutine to thread, RwTx does

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL