downloader

package
v1.8.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: GPL-3.0 Imports: 29 Imported by: 0

Documentation

Overview

Package downloader handles downloading data from other nodes for sync. Sync refers to the process of catching up with other nodes, once caught up nodes use a different process to maintain their syncronisation.

There are a few different modes for syncing

Full: Get all the blocks and apply them all to build the chain state

Fast: Get all the blocks and block receipts and insert them in the db without processing them and at the same time download the state for a block near the head block. Once the state has been downloaded, process subsequent blocks as a full sync would in order to reach the tip of the chain.

Fast sync introduces the concept of the pivot, which is a block at some point behind the head block for which the node attempts to sync state for. In geth the pivot was chosen to be 64 blocks behind the head block, the reason for choosing a point behind the head was to ensure that the block that you are syncing state for a block which is on the main chain and won't get reorged out. (see https://github.com/ethereum/go-ethereum/issues/25100), it was called the pivot because before the pivot the fast sync approach is used but after the pivot full sync is used, so you could imagine the syncing strategy pivoting around that point.

In celo we don't have the problem of reorgs but we still retain the pivot point because the validator uptime scores historically were required to be calculated by processing blocks from an epoch boundary. However since https://github.com/celo-org/celo-blockchain/pull/1833 which removes the requirement to process blocks from an epoch boundary we could in fact drop the concept of pivot.

Snap: Not currently working, but in theory works like fast sync except that nodes download a flat file to get the state, as opposed to making hundreds of thousands of individual requests for it. This should significantly speed up sync.

Light: Downloads only headers during sync and then downloads other data on demand in order to service rpc requests.

Lightest: Like light but downloads only one header per epoch, which on mainnet means one header out of every 17280 headers. This is particularly fast only takes 20 seconds or so to get synced.

Sync process detail

Syncing is initiated with one peer (see eth.loop), the peer selected to sync with is the one with the highest total difficulty of all peers (see eth.nextSyncOp). Syncing may be cancelled and started with a different peer if a peer with a higher total difficulty becomes available.

Syncing introduces the concept of a checkpoint (see params.TrustedCheckpoint). The checkpoint is a hard coded set of trie roots that allow state sync to start before the whole header chain has been downloaded.

The pivot point is the point which the fast sync syncs state for, it is calculated as the first block of the epoch containing the block that is fsMinFullBlocks behind the current head block of the peer we are syncing against (fsMinFullBlocks is hardcoded to 64, chosen by the geth team to make re-orgs of the synced state unlikely).

The first step in syncing with a peer is to fetch the latest block header and pivot header. The geth implementation simply calculates the pivot as being 64 blocks before (fsMinFullBlocks) the head, and so if the head is currently < 64 then there is no valid pivot, in that case geth code uses the head as the pivot (they say to avoid nil pointer exceptions, but its not clear what this will do to the sync). From the celo side there should never be a case without a pivot block because we instead choose the pivot to be zero if head is currently < 64.

Next the sync finds the common ancestor (aka origin) between the node and the peer is syncing against.

If fast syncing {

The pivot is written to a file.

If the origin turns out to be after the pivot then it is set to be just
before the pivot.

The ancient limit is set on the downloader (it would be much nicer if the
concept of ancient could be encapsulated in the database rather than
leaking here). The ancient defines boundary between freezer blocks and
current blocks. Setting ancient limit here enables "direct-ancient mode"
which I guess bypasses putting stuff into the main chain and then having it
be moved to the freezer later. I guess in full sync mode since all blocks
need to be processed all blocks need to go into the main database first and
only after they have been process can they be moved to the freezer, but
since fast sync does not process all blocks that step can be skipped.

Then, and I'm not really clear why if the origin is greater than the last
frozen block (IE there is stuff in the current database beyond whats in the
Freezer) the "direct-ancient mode is disabled", maybe because it is only
applicable for nodes that are starting from scratch or have never reached
the pivot.

If the origin turns out to be lower than the most recent frozen block then
the blockchain is rewound to the origin.

set the pivotHeader on the downloader as the pivot.

}

Then a number of go routines are started to fetch data from the origin to the head.

fetchHeaders fetchBodies fetchReceipts

And one to process the headers. (processHeaders)

If fast sync {
	start a routine to process the fast sync content (processFastSyncContent)
}

If full syncing {

	start a routine to process the full sync content (processFullSyncContent)
}

These goroutines form a pipeline where the downloaded data flows as follows.

							   -> fetchBodies -> processFullSyncContent
                              /               \

fetchHeaders -> processHeaders                 \
                              \                 \
							   -> fetchReceipts --> processFastSyncContent

fetchHeaders

fetchHeaders introduces the skeleton concept. The idea is that the node requests a set of headers from the peer that are spaced out at regular intervals, and then uses all peers to request headers to fill the gaps. The header hashes allow the node to easily verify that the received headers match the chain of the main peer they are syncing against. Whether fetching skeleton headers or not requests for headers are done in batches of up to 192 (MaxHeaderFetch) headers.

If lightest sync {
	fetch just epoch headers till current epoch then fetch all subsequent headers. (no skeleton)
} else {

	fetch  headers using the skeleton approach, until no more skeleton headers
	are returned then switch to requesting all subsequent headers from the
	peer.
}

Wait for headers to be received.

Pass the received headers to the processHeaders routine.

If no more headers are returned and the pivot state has been fully synced then exit. (The pivot being synced is communicated via an atomic from processFastSyncContent)

Fetch more headers as done to start with.

processHeaders

Waits to receive headers from fetchHeaders inserts the received headers into the header chain.

If full sync {
	request blocks for inserted headers. (fetchBodies)
}

If fast sync {
	request blocks and receipts for inserted headers. (fetchBodies & fetchReceipts)
}

processFastSyncContent

Reads fetch results (each fetch result has all the data required for a block (header txs, receipts, randomnes & epochSnarkData)from the downloader queue.

Updates the pivot block point if it has fallen sufficiently behind head.

Splits the fetch results around the pivot.

Results before the pivot are inserted with BlockChain.InsertReceiptChain (which inserts receipts, because in fast sync most blocks are not processed) and those after the pivot

If the pivot has completed syncing {
	Inserts the results after the pivot with, BlockChain.InsertChain and exits.
} else {

	Start the process again prepending the results after the pivot point to the
	newly fetched results. (Note that if the pivot point is subsequently
	updated those results will be processed as fast sync results and inserted
	via BlockChain.InsertReceiptChain, but there seems to be a problem with our
	current implementation that means that the pivot would have to get 2 days
	old before it would be updated, so actually it looks like the list of
	result s will grow a lot during this time could be an OOM consideration)
}

fetchBodies

A routine that gets notified of bodies to fetch and calls into a beast of a function (fetchParts) to fetch batches of block bodies from different peers, those bodies are delivered to the queue that collates them along with other delivered data into fetch results that are then retrieved by either processFastSyncContent or processFullSyncContent.

fetchReceipts

like fetchBodies but for receipts.

Package downloader contains the manual full chain synchronisation.

Index

Constants

This section is empty.

Variables

View Source
var (
	MaxBlockFetch       = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch      = 192 // Amount of block headers to be fetched per retrieval request
	MaxEpochHeaderFetch = 192 // Number of epoch block headers to fetch (only used in IBFT consensus + Lightest sync mode)
	MaxSkeletonSize     = 128 // Number of header fetches to need for a skeleton assembly
	MaxReceiptFetch     = 256 // Amount of transaction receipts to allow fetching per request
	MaxStateFetch       = 384 // Amount of node state values to allow fetching per request

)

Functions

This section is empty.

Types

type BlockChain

type BlockChain interface {
	LightChain

	// HasBlock verifies a block's presence in the local chain.
	HasBlock(common.Hash, uint64) bool

	// HasFastBlock verifies a fast block's presence in the local chain.
	HasFastBlock(common.Hash, uint64) bool

	// GetBlockByHash retrieves a block from the local chain.
	GetBlockByHash(common.Hash) *types.Block

	// CurrentBlock retrieves the head block from the local chain.
	CurrentBlock() *types.Block

	// CurrentFastBlock retrieves the head fast block from the local chain.
	CurrentFastBlock() *types.Block

	// FastSyncCommitHead directly commits the head block to a certain entity.
	FastSyncCommitHead(common.Hash) error

	// InsertChain inserts a batch of blocks into the local chain.
	InsertChain(types.Blocks) (int, error)

	// InsertReceiptChain inserts a batch of receipts into the local chain.
	InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)

	// GetBlockByNumber retrieves a block from the database by number.
	GetBlockByNumber(uint64) *types.Block

	// Snapshots returns the blockchain snapshot tree to paused it during sync.
	Snapshots() *snapshot.Tree
}

BlockChain encapsulates functions required to sync a (full or fast) blockchain.

type DoneEvent

type DoneEvent struct {
	Latest *types.Header
}

type Downloader

type Downloader struct {
	SnapSyncer *snap.Syncer // TODO(karalabe): make private! hack for now
	// contains filtered or unexported fields
}

func New

func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader

New creates a new downloader to fetch hashes and blocks from remote peers.

func (*Downloader) Cancel

func (d *Downloader) Cancel()

Cancel aborts all of the operations and waits for all download goroutines to finish before returning.

func (*Downloader) DeliverBodies

func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, randomness []*types.Randomness, epochSnarkData []*types.EpochSnarkData) (err error)

DeliverBodies injects a new batch of block bodies received from a remote node.

func (*Downloader) DeliverHeaders

func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error

DeliverHeaders injects a new batch of block headers received from a remote node into the download schedule.

func (*Downloader) DeliverNodeData

func (d *Downloader) DeliverNodeData(id string, data [][]byte) error

DeliverNodeData injects a new batch of node state data received from a remote node.

func (*Downloader) DeliverReceipts

func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error

DeliverReceipts injects a new batch of receipts received from a remote node.

func (*Downloader) DeliverSnapPacket added in v1.5.0

func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) error

DeliverSnapPacket is invoked from a peer's message handler when it transmits a data packet for the local node to consume.

func (*Downloader) Progress

func (d *Downloader) Progress() ethereum.SyncProgress

Progress retrieves the synchronisation boundaries, specifically the origin block where synchronisation started at (may have failed/suspended); the block or header sync is currently at; and the latest known block which the sync targets.

In addition, during the state download phase of fast synchronisation the number of processed and the total number of known states are also returned. Otherwise these are zero.

func (*Downloader) RegisterLightPeer

func (d *Downloader) RegisterLightPeer(id string, version uint, peer LightPeer) error

RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.

func (*Downloader) RegisterPeer

func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error

RegisterPeer injects a new download peer into the set of block source to be used for fetching hashes and blocks from.

func (*Downloader) Synchronise

func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error

Synchronise tries to sync up our local block chain with a remote peer, both adding various sanity checks as well as wrapping it with various log entries.

func (*Downloader) Synchronising

func (d *Downloader) Synchronising() bool

Synchronising returns whether the downloader is currently retrieving blocks.

func (*Downloader) Terminate

func (d *Downloader) Terminate()

Terminate interrupts the downloader, canceling all pending operations. The downloader cannot be reused after calling Terminate.

func (*Downloader) UnregisterPeer

func (d *Downloader) UnregisterPeer(id string) error

UnregisterPeer remove a peer from the known list, preventing any action from the specified peer. An effort is also made to return any pending fetches into the queue.

type FailedEvent

type FailedEvent struct{ Err error }

type LightChain

type LightChain interface {
	// HasHeader verifies a header's presence in the local chain.
	HasHeader(common.Hash, uint64) bool

	// GetHeaderByHash retrieves a header from the local chain.
	GetHeaderByHash(common.Hash) *types.Header

	// GetHeaderByHash retrieves a header from the local chain by number.
	GetHeaderByNumber(uint64) *types.Header

	// CurrentHeader retrieves the head header from the local chain.
	CurrentHeader() *types.Header

	// GetTd returns the total difficulty of a local block.
	GetTd(common.Hash, uint64) *big.Int

	// InsertHeaderChain inserts a batch of headers into the local chain.
	InsertHeaderChain([]*types.Header, int, bool) (int, error)

	Config() *params.ChainConfig
	// SetHead rewinds the local chain to a new head.
	SetHead(uint64) error
}

LightChain encapsulates functions required to synchronise a light chain.

type LightPeer

type LightPeer interface {
	Head() (common.Hash, *big.Int)
	RequestHeadersByHash(common.Hash, int, int, bool) error
	RequestHeadersByNumber(uint64, int, int, bool) error
}

LightPeer encapsulates the methods required to synchronise with a remote light peer.

type Peer

type Peer interface {
	LightPeer
	RequestBodies([]common.Hash) error
	RequestReceipts([]common.Hash) error
	RequestNodeData([]common.Hash) error
}

Peer encapsulates the methods required to synchronise with a remote full peer.

type PublicDownloaderAPI

type PublicDownloaderAPI struct {
	// contains filtered or unexported fields
}

PublicDownloaderAPI provides an API which gives information about the current synchronisation status. It offers only methods that operates on data that can be available to anyone without security risks.

func NewPublicDownloaderAPI

func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI

NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that listens for events from the downloader through the global event mux. In case it receives one of these events it broadcasts it to all syncing subscriptions that are installed through the installSyncSubscription channel.

func (*PublicDownloaderAPI) SubscribeSyncStatus

func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription

SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates. The given channel must receive interface values, the result can either

func (*PublicDownloaderAPI) Syncing

func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error)

Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.

type StartEvent

type StartEvent struct{}

type SyncMode

type SyncMode uint32

SyncMode represents the synchronisation mode of the downloader. It is a uint32 as it is used with atomic operations.

const (
	FullSync     SyncMode = iota // Synchronise the entire blockchain history from full blocks
	FastSync                     // Quickly download the headers, full sync only at the chain head
	SnapSync                     // Download the chain and the state via compact snapshots
	LightSync                    // Download only the headers and terminate afterwards
	LightestSync                 // Synchronise one block per Epoch (Celo-specific mode)
)

func (SyncMode) IsValid

func (mode SyncMode) IsValid() bool

func (SyncMode) MarshalText

func (mode SyncMode) MarshalText() ([]byte, error)

func (SyncMode) String

func (mode SyncMode) String() string

String implements the stringer interface.

func (SyncMode) SyncFullBlockChain

func (mode SyncMode) SyncFullBlockChain() bool

Returns true if the full blocks (and not just headers) are fetched. If a mode returns true here then it will return true for `SyncFullHeaderChain` as well.

func (SyncMode) SyncFullHeaderChain

func (mode SyncMode) SyncFullHeaderChain() bool

Returns true if the all headers and not just some a small, discontinuous, set of headers are fetched.

func (*SyncMode) UnmarshalText

func (mode *SyncMode) UnmarshalText(text []byte) error

type SyncStatusSubscription

type SyncStatusSubscription struct {
	// contains filtered or unexported fields
}

SyncStatusSubscription represents a syncing subscription.

func (*SyncStatusSubscription) Unsubscribe

func (s *SyncStatusSubscription) Unsubscribe()

Unsubscribe uninstalls the subscription from the DownloadAPI event loop. The status channel that was passed to subscribeSyncStatus isn't used anymore after this method returns.

type SyncingResult

type SyncingResult struct {
	Syncing bool                  `json:"syncing"`
	Status  ethereum.SyncProgress `json:"status"`
}

SyncingResult provides information about the current synchronisation status for this node.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL