ingest

package
v0.0.0-...-b785d8f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2018 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Package ingest contains the ingestion system for horizon. This system takes data produced by the connected stellar-core database, transforms it and inserts it into the horizon database.

Index

Constants

View Source
const (
	// CurrentVersion reflects the latest version of the ingestion
	// algorithm. As rows are ingested into the horizon database, this version is
	// used to tag them.  In the future, any breaking changes introduced by a
	// developer should be accompanied by an increase in this value.
	//
	// Scripts, that have yet to be ported to this codebase can then be leveraged
	// to re-ingest old data with the new algorithm, providing a seamless
	// transition when the ingested data's structure changes.
	CurrentVersion = 11
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AssetsModified

type AssetsModified map[string]xdr.Asset

AssetsModified tracks all the assets modified during a cycle of ingestion

func (AssetsModified) IngestOperation

func (assetsModified AssetsModified) IngestOperation(err error, op *xdr.Operation, source *xdr.AccountId, coreQ *core.Q) error

IngestOperation updates the assetsModified using the passed in operation

func (AssetsModified) UpdateAssetStats

func (assetsModified AssetsModified) UpdateAssetStats(is *Session)

UpdateAssetStats updates the db with the latest asset stats for the assets that were modified

type Cursor

type Cursor struct {
	// FirstLedger is the beginning of the range of ledgers (inclusive) that will
	// attempt to be ingested in this session.
	FirstLedger int32
	// LastLedger is the end of the range of ledgers (inclusive) that will
	// attempt to be ingested in this session.
	LastLedger int32
	// DB is the stellar-core db that data is ingested from.
	DB *db.Session

	Metrics        *IngesterMetrics
	AssetsModified AssetsModified

	// Err is the error that caused this iteration to fail, if any.
	Err error
	// contains filtered or unexported fields
}

Cursor iterates through a stellar core database's ledgers

func NewCursor

func NewCursor(first, last int32, i *System) *Cursor

NewCursor initializes a new ingestion cursor

func (*Cursor) BeforeAndAfter

func (c *Cursor) BeforeAndAfter(target xdr.LedgerKey) (
	before *xdr.LedgerEntry,
	after *xdr.LedgerEntry,
	err error,
)

BeforeAndAfter loads the ledger entry for `target` before the current operation was applied and after the operation was applied.

func (*Cursor) InLedger

func (c *Cursor) InLedger() bool

InLedger returns true if the cursor is on a ledger.

func (*Cursor) InOperation

func (c *Cursor) InOperation() bool

InOperation returns true if the cursor is on a operation. Will return false after advancing to a new transaction but before advancing on to the transaciton's first operation.

func (*Cursor) InTransaction

func (c *Cursor) InTransaction() bool

InTransaction returns true if the cursor is pointing to a transaction. This will return false after advancing to a new ledger but prior to advancing into the ledger's first transaction.

func (*Cursor) Ledger

func (c *Cursor) Ledger() *core.LedgerHeader

Ledger returns the current ledger

func (*Cursor) LedgerID

func (c *Cursor) LedgerID() int64

LedgerID returns the current ledger's id, as used by the history system.

func (*Cursor) LedgerRange

func (c *Cursor) LedgerRange() (start int64, end int64)

LedgerRange returns the beginning and end of id values that map to the current ledger. Useful for clearing a ledgers worth of data.

func (*Cursor) LedgerSequence

func (c *Cursor) LedgerSequence() int32

LedgerSequence returns the current ledger's sequence

func (*Cursor) NextLedger

func (c *Cursor) NextLedger() bool

NextLedger advances `c` to the next ledger in the iteration, loading a new LedgerBundle from the core database. Returns false if an error occurs or the iteration is complete.

func (*Cursor) NextOp

func (c *Cursor) NextOp() bool

NextOp advances `c` to the next operation in the current transaction. Returns false if the current transaction has nothing left to visit.

func (*Cursor) NextTx

func (c *Cursor) NextTx() bool

NextTx advances `c` to the next transaction in the current ledger. Returns false if the current ledger has no transactions left to visit.

func (*Cursor) Operation

func (c *Cursor) Operation() *xdr.Operation

Operation returns the current operation

func (*Cursor) OperationChanges

func (c *Cursor) OperationChanges() xdr.LedgerEntryChanges

OperationChanges returns all of LedgerEntryChanges that occurred in the course of applying the current operation.

func (*Cursor) OperationCount

func (c *Cursor) OperationCount() int

OperationCount returns the count of operations in the current transaction

func (*Cursor) OperationID

func (c *Cursor) OperationID() int64

OperationID returns the current operations id, as used by the history system.

func (*Cursor) OperationOrder

func (c *Cursor) OperationOrder() int32

OperationOrder returns the order of the current operation amongst the current transaction's operations.

func (*Cursor) OperationResult

func (c *Cursor) OperationResult() *xdr.OperationResultTr

OperationResult returns the current operation's result record

func (*Cursor) OperationSourceAccount

func (c *Cursor) OperationSourceAccount() xdr.AccountId

OperationSourceAccount returns the current operation's effective source account (i.e. default's to the transaction's source account).

func (*Cursor) OperationType

func (c *Cursor) OperationType() xdr.OperationType

OperationType returns the current operation type

func (*Cursor) Operations

func (c *Cursor) Operations() []xdr.Operation

Operations returns the current transactions operations.

func (*Cursor) SuccessfulLedgerOperationCount

func (c *Cursor) SuccessfulLedgerOperationCount() (ret int)

SuccessfulLedgerOperationCount returns the count of operations in the current ledger

func (*Cursor) SuccessfulTransactionCount

func (c *Cursor) SuccessfulTransactionCount() (ret int)

SuccessfulTransactionCount returns the count of transactions in the current ledger that succeeded.

func (*Cursor) Transaction

func (c *Cursor) Transaction() *core.Transaction

Transaction returns the current transaction

func (*Cursor) TransactionFee

func (c *Cursor) TransactionFee() *core.TransactionFee

TransactionFee returns the txfeehistory row for the current transaction.

func (*Cursor) TransactionID

func (c *Cursor) TransactionID() int64

TransactionID returns the current tranaction's id, as used by the history system.

func (*Cursor) TransactionMetaBundle

func (c *Cursor) TransactionMetaBundle() *meta.Bundle

TransactionMetaBundle provides easier access to the meta data regarding the application of the current transaction.

func (*Cursor) TransactionSourceAccount

func (c *Cursor) TransactionSourceAccount() xdr.AccountId

TransactionSourceAccount returns the current transaction's source account id

type EffectIngestion

type EffectIngestion struct {
	Dest        *Ingestion
	OperationID int64
	// contains filtered or unexported fields
}

EffectIngestion is a helper struct to smooth the ingestion of effects. this struct will track what the correct operation to use and order to use when adding effects into an ingestion.

func (*EffectIngestion) Add

func (ei *EffectIngestion) Add(aid xdr.AccountId, typ history.EffectType, details interface{}) bool

Add writes an effect to the database while automatically tracking the index to use.

func (*EffectIngestion) Finish

func (ei *EffectIngestion) Finish() error

Finish marks this ingestion as complete, returning any error that was recorded.

type IngesterMetrics

type IngesterMetrics struct {
	ClearLedgerTimer  metrics.Timer
	IngestLedgerTimer metrics.Timer
	LoadLedgerTimer   metrics.Timer
}

IngesterMetrics tracks all the metrics for the ingestion subsystem

type Ingestion

type Ingestion struct {
	// DB is the sql connection to be used for writing any rows into the horizon
	// database.
	DB *db.Session
	// contains filtered or unexported fields
}

Ingestion receives write requests from a Session

func (*Ingestion) Clear

func (ingest *Ingestion) Clear(start int64, end int64) error

Clear removes a range of data from the history database, exclusive of the end id provided.

func (*Ingestion) ClearAll

func (ingest *Ingestion) ClearAll() error

ClearAll clears the entire history database

func (*Ingestion) Close

func (ingest *Ingestion) Close() error

Close finishes the current transaction and finishes this ingestion.

func (*Ingestion) Effect

func (ingest *Ingestion) Effect(aid int64, opid int64, order int, typ history.EffectType, details interface{}) error

Effect adds a new row into the `history_effects` table.

func (*Ingestion) Flush

func (ingest *Ingestion) Flush() error

Flush writes the currently buffered rows to the db, and if successful starts a new transaction.

func (*Ingestion) Ledger

func (ingest *Ingestion) Ledger(
	id int64,
	header *core.LedgerHeader,
	txs int,
	ops int,
) error

Ledger adds a ledger to the current ingestion

func (*Ingestion) Operation

func (ingest *Ingestion) Operation(
	id int64,
	txid int64,
	order int32,
	source xdr.AccountId,
	typ xdr.OperationType,
	details map[string]interface{},

) error

Operation ingests the provided operation data into a new row in the `history_operations` table

func (*Ingestion) OperationParticipants

func (ingest *Ingestion) OperationParticipants(op int64, aids []xdr.AccountId) error

OperationParticipants ingests the provided accounts `aids` as participants of operation with id `op`, creating a new row in the `history_operation_participants` table.

func (*Ingestion) Rollback

func (ingest *Ingestion) Rollback() (err error)

Rollback aborts this ingestions transaction

func (*Ingestion) Start

func (ingest *Ingestion) Start() (err error)

Start makes the ingestion reeady, initializing the insert builders and tx

func (*Ingestion) Trade

func (ingest *Ingestion) Trade(
	opid int64,
	order int32,
	buyer xdr.AccountId,
	trade xdr.ClaimOfferAtom,
	ledgerClosedAt int64,
) error

Trade records a trade into the history_trades table

func (*Ingestion) Transaction

func (ingest *Ingestion) Transaction(
	id int64,
	tx *core.Transaction,
	fee *core.TransactionFee,
) error

Transaction ingests the provided transaction data into a new row in the `history_transactions` table

func (*Ingestion) TransactionParticipants

func (ingest *Ingestion) TransactionParticipants(tx int64, aids []xdr.AccountId) error

TransactionParticipants ingests the provided account ids as participants of transaction with id `tx`, creating a new row in the `history_transaction_participants` table.

type LedgerBundle

type LedgerBundle struct {
	Sequence        int32
	Header          core.LedgerHeader
	TransactionFees []core.TransactionFee
	Transactions    []core.Transaction
}

LedgerBundle represents a single ledger's worth of novelty created by one ledger close

func (*LedgerBundle) Load

func (lb *LedgerBundle) Load(db *db.Session) error

Load runs queries against `core` to fill in the records of the bundle.

type Session

type Session struct {
	Cursor    *Cursor
	Ingestion *Ingestion
	// Network is the passphrase for the network being imported
	Network string

	// StellarCoreURL is the http endpoint of the stellar-core that data is being
	// ingested from.
	StellarCoreURL string

	// ClearExisting causes the session to clear existing data from the horizon db
	// when the session is run.
	ClearExisting bool

	// SkipCursorUpdate causes the session to skip
	// reporting the "last imported ledger" cursor to
	// stellar-core
	SkipCursorUpdate bool

	// Metrics is a reference to where the session should record its metric information
	Metrics *IngesterMetrics

	// Err is the error that caused this session to fail, if any.
	Err error

	// Ingested is the number of ledgers that were successfully ingested during
	// this session.
	Ingested int
}

Session represents a single attempt at ingesting data into the history database.

func NewSession

func NewSession(i *System) *Session

NewSession initialize a new ingestion session

func (*Session) Run

func (is *Session) Run()

Run starts an attempt to ingest the range of ledgers specified in this session.

type System

type System struct {
	// HorizonDB is the connection to the horizon database that ingested data will
	// be written to.
	HorizonDB *db.Session

	// CoreDB is the stellar-core db that data is ingested from.
	CoreDB *db.Session

	Metrics IngesterMetrics

	// Network is the passphrase for the network being imported
	Network string

	// StellarCoreURL is the http endpoint of the stellar-core that data is being
	// ingested from.
	StellarCoreURL string

	// SkipCursorUpdate causes the ingestor to skip
	// reporting the "last imported ledger" cursor to
	// stellar-core
	SkipCursorUpdate bool

	// HistoryRetentionCount is the desired minimum number of ledgers to
	// keep in the history database, working backwards from the latest core
	// ledger.  0 represents "all ledgers".
	HistoryRetentionCount uint
	// contains filtered or unexported fields
}

System represents the data ingestion subsystem of horizon.

func New

func New(network string, coreURL string, core, horizon *db.Session) *System

New initializes the ingester, causing it to begin polling the stellar-core database for now ledgers and ingesting data into the horizon database.

func (*System) Backfill

func (i *System) Backfill(n uint) error

Backfill ingests history in reverse chronological order, from the current horizon elder query for `n` ledgers

func (*System) ClearAll

func (i *System) ClearAll() error

ClearAll removes all previously ingested historical data from the horizon database.

func (*System) RebaseHistory

func (i *System) RebaseHistory(sequence int32) error

RebaseHistory re-establishes horizon's history database using the provided sequence as a starting point.

func (*System) ReingestAll

func (i *System) ReingestAll() (int, error)

ReingestAll re-ingests all ledgers

func (*System) ReingestOutdated

func (i *System) ReingestOutdated() (n int, err error)

ReingestOutdated finds old ledgers and reimports them.

func (*System) ReingestRange

func (i *System) ReingestRange(start, end int32) (int, error)

ReingestRange reingests a range of ledgers, from `start` to `end`, inclusive.

func (*System) ReingestSingle

func (i *System) ReingestSingle(sequence int32) error

ReingestSingle re-ingests a single ledger

func (*System) Tick

func (i *System) Tick() *Session

Tick triggers the ingestion system to ingest any new ledger data, provided that there currently is not an import session in progress.

Directories

Path Synopsis
Package participants contains functions to derive a set of "participant" addresses for various data structures in the Stellar network's ledger.
Package participants contains functions to derive a set of "participant" addresses for various data structures in the Stellar network's ledger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL