Documentation ¶
Index ¶
- Constants
- Variables
- func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.CIDIndexer, error)
- func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriever, error)
- func NewCleaner(chain shared.ChainType, db *postgres.DB) (shared.Cleaner, error)
- func NewIPLDFetcher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ...) (shared.IPLDFetcher, error)
- func NewIPLDPublisher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ...) (shared.IPLDPublisher, error)
- func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error)
- func NewPayloadConverter(chain shared.ChainType) (shared.PayloadConverter, error)
- func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error)
- func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error)
- func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error)
- type BackFillInterface
- type BackFillService
- type Config
- type Flag
- type InfoAPI
- type PublicSuperNodeAPI
- type Service
- func (sap *Service) APIs() []rpc.API
- func (sap *Service) Chain() shared.ChainType
- func (sap *Service) Node() *core.Node
- func (sap *Service) Protocols() []p2p.Protocol
- func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
- func (sap *Service) Start(*p2p.Server) error
- func (sap *Service) Stop() error
- func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, ...)
- func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error
- func (sap *Service) Unsubscribe(id rpc.ID)
- type Subscription
- type SubscriptionPayload
- type SuperNode
Constants ¶
const ( DefaultMaxBatchSize uint64 = 100 DefaultMaxBatchNumber int64 = 50 )
const ( SUPERNODE_CHAIN = "SUPERNODE_CHAIN" SUPERNODE_SYNC = "SUPERNODE_SYNC" SUPERNODE_WORKERS = "SUPERNODE_WORKERS" SUPERNODE_SERVER = "SUPERNODE_SERVER" SUPERNODE_WS_PATH = "SUPERNODE_WS_PATH" SUPERNODE_IPC_PATH = "SUPERNODE_IPC_PATH" SUPERNODE_HTTP_PATH = "SUPERNODE_HTTP_PATH" SUPERNODE_BACKFILL = "SUPERNODE_BACKFILL" SUPERNODE_FREQUENCY = "SUPERNODE_FREQUENCY" SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE" SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER" SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL" SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS" SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS" SYNC_MAX_CONN_LIFETIME = "SYNC_MAX_CONN_LIFETIME" BACKFILL_MAX_IDLE_CONNECTIONS = "BACKFILL_MAX_IDLE_CONNECTIONS" BACKFILL_MAX_OPEN_CONNECTIONS = "BACKFILL_MAX_OPEN_CONNECTIONS" BACKFILL_MAX_CONN_LIFETIME = "BACKFILL_MAX_CONN_LIFETIME" SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS" SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS" SERVER_MAX_CONN_LIFETIME = "SERVER_MAX_CONN_LIFETIME" )
Env variables
const APIName = "vdb"
APIName is the namespace used for the state diffing service API
const APIVersion = "0.0.1"
APIVersion is the version of the state diffing service API
const (
PayloadChanBufferSize = 2000
)
Variables ¶
var ( Sync mode = "sync" BackFill mode = "backFill" Serve mode = "serve" )
Functions ¶
func NewCIDIndexer ¶
func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.CIDIndexer, error)
NewCIDIndexer constructs a CIDIndexer for the provided chain type
func NewCIDRetriever ¶
NewCIDRetriever constructs a CIDRetriever for the provided chain type
func NewCleaner ¶
NewCleaner constructs a Cleaner for the provided chain type
func NewIPLDFetcher ¶
func NewIPLDFetcher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDFetcher, error)
NewIPLDFetcher constructs an IPLDFetcher for the provided chain type
func NewIPLDPublisher ¶
func NewIPLDPublisher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDPublisher, error)
NewIPLDPublisher constructs an IPLDPublisher for the provided chain type
func NewPaylaodFetcher ¶
func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error)
NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type
func NewPayloadConverter ¶
func NewPayloadConverter(chain shared.ChainType) (shared.PayloadConverter, error)
NewPayloadConverter constructs a PayloadConverter for the provided chain type
func NewPayloadStreamer ¶
func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error)
NewPayloadStreamer constructs a PayloadStreamer for the provided chain type
func NewPublicAPI ¶
NewPublicAPI constructs a PublicAPI for the provided chain type
func NewResponseFilterer ¶
func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error)
NewResponseFilterer constructs a ResponseFilterer for the provided chain type
Types ¶
type BackFillInterface ¶
type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node BackFill(wg *sync.WaitGroup) Stop() error }
BackFillInterface for filling in gaps in the super node
func NewBackFillService ¶
func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error)
NewBackFillService returns a new BackFillInterface
type BackFillService ¶
type BackFillService struct { // Interface for converting payloads into IPLD object payloads Converter shared.PayloadConverter // Interface for publishing the IPLD payloads to IPFS Publisher shared.IPLDPublisher // Interface for indexing the CIDs of the published IPLDs in Postgres Indexer shared.CIDIndexer // Interface for searching and retrieving CIDs from Postgres index Retriever shared.CIDRetriever // Interface for fetching payloads over at historical blocks; over http Fetcher shared.PayloadFetcher // Channel for forwarding backfill payloads to the ScreenAndServe process ScreenAndServeChan chan shared.ConvertedData // Check frequency GapCheckFrequency time.Duration // Size of batch fetches BatchSize uint64 // Number of goroutines BatchNumber int64 // Channel for receiving quit signal QuitChan chan bool // contains filtered or unexported fields }
BackFillService for filling in gaps in the super node
func (*BackFillService) BackFill ¶
func (bfs *BackFillService) BackFill(wg *sync.WaitGroup)
BackFill periodically checks for and fills in gaps in the super node db
func (*BackFillService) Stop ¶
func (bfs *BackFillService) Stop() error
type Config ¶
type Config struct { // Ubiquitous fields Chain shared.ChainType IPFSPath string IPFSMode shared.IPFSMode DBConfig config.Database // Server fields Serve bool ServeDBConn *postgres.DB WSEndpoint string HTTPEndpoint string IPCEndpoint string // Sync params Sync bool SyncDBConn *postgres.DB Workers int WSClient interface{} NodeInfo core.Node // Backfiller params BackFill bool BackFillDBConn *postgres.DB HTTPClient interface{} Frequency time.Duration BatchSize uint64 BatchNumber uint64 ValidationLevel int Timeout time.Duration // HTTP connection timeout in seconds }
Config struct
func NewSuperNodeConfig ¶
NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file Separate chain supernode instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile
func (*Config) BackFillFields ¶
BackFillFields is used to fill in the BackFill fields of the config
type InfoAPI ¶
type InfoAPI struct{}
Struct for holding super node meta data
func NewInfoAPI ¶
func NewInfoAPI() *InfoAPI
NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process
type PublicSuperNodeAPI ¶
type PublicSuperNodeAPI struct {
// contains filtered or unexported fields
}
PublicSuperNodeAPI is the public api for the super node
func NewPublicSuperNodeAPI ¶
func NewPublicSuperNodeAPI(superNodeInterface SuperNode) *PublicSuperNodeAPI
NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process
func (*PublicSuperNodeAPI) Chain ¶
func (api *PublicSuperNodeAPI) Chain() shared.ChainType
Chain returns the chain type that this super node instance supports
func (*PublicSuperNodeAPI) Node ¶
func (api *PublicSuperNodeAPI) Node() *core.Node
Node is a public rpc method to allow transformers to fetch the node info for the super node NOTE: this is the node info for the node that the super node is syncing from, not the node info for the super node itself
func (*PublicSuperNodeAPI) Stream ¶
func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error)
Stream is the public method to setup a subscription that fires off super node payloads as they are processed
type Service ¶
type Service struct { // Used to sync access to the Subscriptions sync.Mutex // Interface for streaming payloads over an rpc subscription Streamer shared.PayloadStreamer // Interface for converting raw payloads into IPLD object payloads Converter shared.PayloadConverter // Interface for publishing the IPLD payloads to IPFS Publisher shared.IPLDPublisher // Interface for indexing the CIDs of the published IPLDs in Postgres Indexer shared.CIDIndexer // Interface for filtering and serving data according to subscribed clients according to their specification Filterer shared.ResponseFilterer // Interface for fetching IPLD objects from IPFS IPLDFetcher shared.IPLDFetcher // Interface for searching and retrieving CIDs from Postgres index Retriever shared.CIDRetriever // Chan the processor uses to subscribe to payloads from the Streamer PayloadChan chan shared.RawChainData // Used to signal shutdown of the service QuitChan chan bool // A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters) Subscriptions map[common.Hash]map[rpc.ID]Subscription // A mapping of subscription params hash to the corresponding subscription params SubscriptionTypes map[common.Hash]shared.SubscriptionSettings // Info for the Geth node that this super node is working with NodeInfo *core.Node // Number of publishAndIndex workers WorkerPoolSize int // contains filtered or unexported fields }
Service is the underlying struct for the super node
func (*Service) Serve ¶
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
Serve listens for incoming converter data off the screenAndServePayload from the Sync process It filters and sends this data to any subscribers to the service This process can also be stood up alone, without an screenAndServePayload attached to a Sync process and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
func (*Service) Start ¶
Start is used to begin the service This is mostly just to satisfy the node.Service interface
func (*Service) Stop ¶
Stop is used to close down the service This is mostly just to satisfy the node.Service interface
func (*Service) Subscribe ¶
func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
Subscribe is used by the API to remotely subscribe to the service loop The params must be rlp serializable and satisfy the SubscriptionSettings() interface
func (*Service) Sync ¶
func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error
Sync streams incoming raw chain data and converts it for further processing It forwards the converted data to the publishAndIndex process(es) it spins up If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel This continues on no matter if or how many subscribers there are
func (*Service) Unsubscribe ¶
Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop
type Subscription ¶
type Subscription struct { ID rpc.ID PayloadChan chan<- SubscriptionPayload QuitChan chan<- bool }
Subscription holds the information for an individual client subscription to the super node
type SubscriptionPayload ¶
type SubscriptionPayload struct { Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload Height int64 `json:"height"` Err string `json:"err"` // field for error Flag Flag `json:"flag"` // field for message }
SubscriptionPayload is the struct for a super node stream payload It carries data of a type specific to the chain being supported/queried and an error message
func (SubscriptionPayload) BackFillComplete ¶
func (sp SubscriptionPayload) BackFillComplete() bool
func (SubscriptionPayload) Error ¶
func (sp SubscriptionPayload) Error() error
type SuperNode ¶
type SuperNode interface { // APIs(), Protocols(), Start() and Stop() node.Service // Data processing event loop Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error // Pub-Sub handling event loop Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) // Method to subscribe to the service Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) // Method to unsubscribe from the service Unsubscribe(id rpc.ID) // Method to access the node info for the service Node() *core.Node // Method to access chain type Chain() shared.ChainType }
SuperNode is the top level interface for streaming, converting to IPLDs, publishing, and indexing all chain data; screening this data; and serving it up to subscribed clients This service is compatible with the Ethereum service interface (node.Service)
func NewSuperNode ¶
NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct