watcher

package
v0.0.11-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 22, 2020 License: AGPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultMaxBatchSize   uint64 = 100
	DefaultMaxBatchNumber int64  = 50
)
View Source
const (
	SUPERNODE_CHAIN            = "SUPERNODE_CHAIN"
	SUPERNODE_SYNC             = "SUPERNODE_SYNC"
	SUPERNODE_WORKERS          = "SUPERNODE_WORKERS"
	SUPERNODE_SERVER           = "SUPERNODE_SERVER"
	SUPERNODE_WS_PATH          = "SUPERNODE_WS_PATH"
	SUPERNODE_IPC_PATH         = "SUPERNODE_IPC_PATH"
	SUPERNODE_HTTP_PATH        = "SUPERNODE_HTTP_PATH"
	SUPERNODE_BACKFILL         = "SUPERNODE_BACKFILL"
	SUPERNODE_FREQUENCY        = "SUPERNODE_FREQUENCY"
	SUPERNODE_BATCH_SIZE       = "SUPERNODE_BATCH_SIZE"
	SUPERNODE_BATCH_NUMBER     = "SUPERNODE_BATCH_NUMBER"
	SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL"

	SYNC_MAX_IDLE_CONNECTIONS = "SYNC_MAX_IDLE_CONNECTIONS"
	SYNC_MAX_OPEN_CONNECTIONS = "SYNC_MAX_OPEN_CONNECTIONS"
	SYNC_MAX_CONN_LIFETIME    = "SYNC_MAX_CONN_LIFETIME"

	BACKFILL_MAX_IDLE_CONNECTIONS = "BACKFILL_MAX_IDLE_CONNECTIONS"
	BACKFILL_MAX_OPEN_CONNECTIONS = "BACKFILL_MAX_OPEN_CONNECTIONS"
	BACKFILL_MAX_CONN_LIFETIME    = "BACKFILL_MAX_CONN_LIFETIME"

	SERVER_MAX_IDLE_CONNECTIONS = "SERVER_MAX_IDLE_CONNECTIONS"
	SERVER_MAX_OPEN_CONNECTIONS = "SERVER_MAX_OPEN_CONNECTIONS"
	SERVER_MAX_CONN_LIFETIME    = "SERVER_MAX_CONN_LIFETIME"
)

Env variables

View Source
const APIName = "vdb"

APIName is the namespace used for the state diffing service API

View Source
const APIVersion = "0.0.1"

APIVersion is the version of the state diffing service API

View Source
const (
	PayloadChanBufferSize = 2000
)

Variables

View Source
var (
	Sync     mode = "sync"
	BackFill mode = "backFill"
	Serve    mode = "serve"
)

Functions

func NewCIDIndexer

func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.CIDIndexer, error)

NewCIDIndexer constructs a CIDIndexer for the provided chain type

func NewCIDRetriever

func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriever, error)

NewCIDRetriever constructs a CIDRetriever for the provided chain type

func NewCleaner

func NewCleaner(chain shared.ChainType, db *postgres.DB) (shared.Cleaner, error)

NewCleaner constructs a Cleaner for the provided chain type

func NewIPLDFetcher

func NewIPLDFetcher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDFetcher, error)

NewIPLDFetcher constructs an IPLDFetcher for the provided chain type

func NewIPLDPublisher

func NewIPLDPublisher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDPublisher, error)

NewIPLDPublisher constructs an IPLDPublisher for the provided chain type

func NewPaylaodFetcher

func NewPaylaodFetcher(chain shared.ChainType, client interface{}, timeout time.Duration) (shared.PayloadFetcher, error)

NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type

func NewPayloadConverter

func NewPayloadConverter(chain shared.ChainType) (shared.PayloadConverter, error)

NewPayloadConverter constructs a PayloadConverter for the provided chain type

func NewPayloadStreamer

func NewPayloadStreamer(chain shared.ChainType, clientOrConfig interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error)

NewPayloadStreamer constructs a PayloadStreamer for the provided chain type

func NewPublicAPI

func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error)

NewPublicAPI constructs a PublicAPI for the provided chain type

func NewResponseFilterer

func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error)

NewResponseFilterer constructs a ResponseFilterer for the provided chain type

Types

type BackFillInterface

type BackFillInterface interface {
	// Method for the super node to periodically check for and fill in gaps in its data using an archival node
	BackFill(wg *sync.WaitGroup)
	Stop() error
}

BackFillInterface for filling in gaps in the super node

func NewBackFillService

func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error)

NewBackFillService returns a new BackFillInterface

type BackFillService

type BackFillService struct {
	// Interface for converting payloads into IPLD object payloads
	Converter shared.PayloadConverter
	// Interface for publishing the IPLD payloads to IPFS
	Publisher shared.IPLDPublisher
	// Interface for indexing the CIDs of the published IPLDs in Postgres
	Indexer shared.CIDIndexer
	// Interface for searching and retrieving CIDs from Postgres index
	Retriever shared.CIDRetriever
	// Interface for fetching payloads over at historical blocks; over http
	Fetcher shared.PayloadFetcher
	// Channel for forwarding backfill payloads to the ScreenAndServe process
	ScreenAndServeChan chan shared.ConvertedData
	// Check frequency
	GapCheckFrequency time.Duration
	// Size of batch fetches
	BatchSize uint64
	// Number of goroutines
	BatchNumber int64
	// Channel for receiving quit signal
	QuitChan chan bool
	// contains filtered or unexported fields
}

BackFillService for filling in gaps in the super node

func (*BackFillService) BackFill

func (bfs *BackFillService) BackFill(wg *sync.WaitGroup)

BackFill periodically checks for and fills in gaps in the super node db

func (*BackFillService) Stop

func (bfs *BackFillService) Stop() error

type Config

type Config struct {
	// Ubiquitous fields
	Chain    shared.ChainType
	IPFSPath string
	IPFSMode shared.IPFSMode
	DBConfig config.Database
	// Server fields
	Serve        bool
	ServeDBConn  *postgres.DB
	WSEndpoint   string
	HTTPEndpoint string
	IPCEndpoint  string
	// Sync params
	Sync       bool
	SyncDBConn *postgres.DB
	Workers    int
	WSClient   interface{}
	NodeInfo   core.Node
	// Backfiller params
	BackFill        bool
	BackFillDBConn  *postgres.DB
	HTTPClient      interface{}
	Frequency       time.Duration
	BatchSize       uint64
	BatchNumber     uint64
	ValidationLevel int
	Timeout         time.Duration // HTTP connection timeout in seconds
}

Config struct

func NewSuperNodeConfig

func NewSuperNodeConfig() (*Config, error)

NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file Separate chain supernode instances need to be ran with separate ipfs path in order to avoid lock contention on the ipfs repository lockfile

func (*Config) BackFillFields

func (c *Config) BackFillFields() error

BackFillFields is used to fill in the BackFill fields of the config

type Flag

type Flag int32
const (
	EmptyFlag Flag = iota
	BackFillCompleteFlag
)

type InfoAPI

type InfoAPI struct{}

Struct for holding super node meta data

func NewInfoAPI

func NewInfoAPI() *InfoAPI

NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process

func (*InfoAPI) Modules

func (iapi *InfoAPI) Modules() map[string]string

Modules returns modules supported by this api

func (*InfoAPI) NodeInfo

func (iapi *InfoAPI) NodeInfo() *p2p.NodeInfo

NodeInfo gathers and returns a collection of metadata for the super node

func (*InfoAPI) Version

func (iapi *InfoAPI) Version() string

Version returns the version of the super node

type PublicSuperNodeAPI

type PublicSuperNodeAPI struct {
	// contains filtered or unexported fields
}

PublicSuperNodeAPI is the public api for the super node

func NewPublicSuperNodeAPI

func NewPublicSuperNodeAPI(superNodeInterface SuperNode) *PublicSuperNodeAPI

NewPublicSuperNodeAPI creates a new PublicSuperNodeAPI with the provided underlying SyncPublishScreenAndServe process

func (*PublicSuperNodeAPI) Chain

func (api *PublicSuperNodeAPI) Chain() shared.ChainType

Chain returns the chain type that this super node instance supports

func (*PublicSuperNodeAPI) Node

func (api *PublicSuperNodeAPI) Node() *core.Node

Node is a public rpc method to allow transformers to fetch the node info for the super node NOTE: this is the node info for the node that the super node is syncing from, not the node info for the super node itself

func (*PublicSuperNodeAPI) Stream

func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error)

Stream is the public method to setup a subscription that fires off super node payloads as they are processed

type Service

type Service struct {
	// Used to sync access to the Subscriptions
	sync.Mutex
	// Interface for streaming payloads over an rpc subscription
	Streamer shared.PayloadStreamer
	// Interface for converting raw payloads into IPLD object payloads
	Converter shared.PayloadConverter
	// Interface for publishing the IPLD payloads to IPFS
	Publisher shared.IPLDPublisher
	// Interface for indexing the CIDs of the published IPLDs in Postgres
	Indexer shared.CIDIndexer
	// Interface for filtering and serving data according to subscribed clients according to their specification
	Filterer shared.ResponseFilterer
	// Interface for fetching IPLD objects from IPFS
	IPLDFetcher shared.IPLDFetcher
	// Interface for searching and retrieving CIDs from Postgres index
	Retriever shared.CIDRetriever
	// Chan the processor uses to subscribe to payloads from the Streamer
	PayloadChan chan shared.RawChainData
	// Used to signal shutdown of the service
	QuitChan chan bool
	// A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters)
	Subscriptions map[common.Hash]map[rpc.ID]Subscription
	// A mapping of subscription params hash to the corresponding subscription params
	SubscriptionTypes map[common.Hash]shared.SubscriptionSettings
	// Info for the Geth node that this super node is working with
	NodeInfo *core.Node
	// Number of publishAndIndex workers
	WorkerPoolSize int
	// contains filtered or unexported fields
}

Service is the underlying struct for the super node

func (*Service) APIs

func (sap *Service) APIs() []rpc.API

APIs returns the RPC descriptors the super node service offers

func (*Service) Chain

func (sap *Service) Chain() shared.ChainType

Chain returns the chain type for this service

func (*Service) Node

func (sap *Service) Node() *core.Node

Node returns the node info for this service

func (*Service) Protocols

func (sap *Service) Protocols() []p2p.Protocol

Protocols exports the services p2p protocols, this service has none

func (*Service) Serve

func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)

Serve listens for incoming converter data off the screenAndServePayload from the Sync process It filters and sends this data to any subscribers to the service This process can also be stood up alone, without an screenAndServePayload attached to a Sync process and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only

func (*Service) Start

func (sap *Service) Start(*p2p.Server) error

Start is used to begin the service This is mostly just to satisfy the node.Service interface

func (*Service) Stop

func (sap *Service) Stop() error

Stop is used to close down the service This is mostly just to satisfy the node.Service interface

func (*Service) Subscribe

func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)

Subscribe is used by the API to remotely subscribe to the service loop The params must be rlp serializable and satisfy the SubscriptionSettings() interface

func (*Service) Sync

func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error

Sync streams incoming raw chain data and converts it for further processing It forwards the converted data to the publishAndIndex process(es) it spins up If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel This continues on no matter if or how many subscribers there are

func (*Service) Unsubscribe

func (sap *Service) Unsubscribe(id rpc.ID)

Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop

type Subscription

type Subscription struct {
	ID          rpc.ID
	PayloadChan chan<- SubscriptionPayload
	QuitChan    chan<- bool
}

Subscription holds the information for an individual client subscription to the super node

type SubscriptionPayload

type SubscriptionPayload struct {
	Data   []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload
	Height int64  `json:"height"`
	Err    string `json:"err"`  // field for error
	Flag   Flag   `json:"flag"` // field for message
}

SubscriptionPayload is the struct for a super node stream payload It carries data of a type specific to the chain being supported/queried and an error message

func (SubscriptionPayload) BackFillComplete

func (sp SubscriptionPayload) BackFillComplete() bool

func (SubscriptionPayload) Error

func (sp SubscriptionPayload) Error() error

type SuperNode

type SuperNode interface {
	// APIs(), Protocols(), Start() and Stop()
	node.Service
	// Data processing event loop
	Sync(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.ConvertedData) error
	// Pub-Sub handling event loop
	Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData)
	// Method to subscribe to the service
	Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
	// Method to unsubscribe from the service
	Unsubscribe(id rpc.ID)
	// Method to access the node info for the service
	Node() *core.Node
	// Method to access chain type
	Chain() shared.ChainType
}

SuperNode is the top level interface for streaming, converting to IPLDs, publishing, and indexing all chain data; screening this data; and serving it up to subscribed clients This service is compatible with the Ethereum service interface (node.Service)

func NewSuperNode

func NewSuperNode(settings *Config) (SuperNode, error)

NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL