monitoring

package
v0.0.0-...-47b5856 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2024 License: MIT Imports: 35 Imported by: 0

README

On-chain monitor framework

Architecture

Chainlink blueprints contains a descript of the architecture of the on-chain monitor framework, the tools available for integrators as well as a tutorial for creating new integrations.

Documentation

Godoc generated documentation is available here

Developing the monitoring framework

Abstractions

Don't create abstractions for the sake of it. Always ask "What is the simplest thing that can solve the problem?" Always ask "Would this code make sense if I didn't write it but had to change it?" As a rule of thumb, abstraction around IO - think database or http connections - are almost always a good idea, because they make testing easier. Another rule of thumb, is to compose abstractions by means of dependency injection.

Concurrency

Concurrency is hard. To make it manageable, always use established concurrent patterns specific to golang. Eg. https://go.dev/blog/pipelines or https://talks.golang.org/2012/concurrency.slide Have all the concurrent code in one place and extract everything else in functions or interfaces. This will make testing the concurrent code easier - but still not easy!

A tenant of good concurrent code is resource management. Your primary resources are goroutines, channels and contexts (effectively wrappers ontop of channels). Make sure, that upon exit, your program cleanly terminates all goroutine, releases all OS resources (file pointers, sockets, etc.), no channel is being used anymore and all contexts are cancelled. This will force you to manage these things actively in your code and - hopefully - prevent leaks.

Concurrency abstractions are notoriously "leaky". Unless they are very simple layers on top of well tested solution - eg. Subprocesses is a wrapper over sync.WorkGroup - avoid introducing concurrency abstractions!

Logging

I have yet to find an engineer who like GBs of logging. Useless logs have a cognitive load on the person trying to solve an issue. My approach is to log as little as possible, but when you do log, put all the data needed to reproduce the issue and fix it in the log! Logging takes time to tune. Try to trigger or simulate errors in development and see if the log line is useful for debugging!

Testing

This is controversial but I'm not a huge fan of testing as much as possible. Most tests I've seen - and written - are brittle, are non-deterministic - ofc they break only in CI - and are not very valuable. The most valuable test is an end-to-end test that checks a use-case. The lest valuable test is a unit test that tests the implementation of a function.

Another thing that makes writing valuable tests easier is good "interfaces". If a piece of code has clearly defined inputs and outputs, it's easier to test.

Errors

An often overlooked part of the interface of a component are the errors it can produce. It's easy to return nil, err! Well defined errors can be either public values or error types - when more context is needed to debug the error. Make sure you consider whether a specific error can be handled by the caller or needs to be pushed up the stack!

Benchmarks

Execute the existing benchmark whenever a significant change to the system is introduced. While these benchmarks run in an ideal situation - eg. 0 network latency, correctly formatted message, etc. - they give a reference point for potential performance degradation introduced by new features.

Benchmarks are - arguably - the easiest way to profile your code!

Documentation

Overview

This file contains data generators and utilities to simplify tests. The data generated here shouldn't be used to run OCR instances

Index

Constants

This section is empty.

Variables

View Source
var (
	// Avro schemas to sync with the registry
	TransmissionAvroSchema        string
	ConfigSetSimplifiedAvroSchema string
)
View Source
var (
	// ErrNoUpdate is an error value interpreted by a Poller to mean that the
	// Fetch() was successful but a new value was not found.
	// The pollers will not report this as an error!
	ErrNoUpdate = errors.New("no updates found")
)

Functions

func MakeConfigSetSimplifiedMapping

func MakeConfigSetSimplifiedMapping(
	envelope Envelope,
	_ ChainConfig,
	feedConfig FeedConfig,
) (map[string]interface{}, error)

func MakeTransmissionMapping

func MakeTransmissionMapping(
	envelope Envelope,
	chainConfig ChainConfig,
	feedConfig FeedConfig,
) (map[string]interface{}, error)

func SubjectFromTopic

func SubjectFromTopic(topic string) string

SubjectFromTopic computes the associated AVRO schema subject name from a kafka topic name.

Types

type ChainConfig

type ChainConfig interface {
	GetRPCEndpoint() string
	GetNetworkName() string
	GetNetworkID() string
	GetChainID() string
	GetReadTimeout() time.Duration
	GetPollInterval() time.Duration
	// Useful for serializing to avro.
	// Check the latest version of the transmission schema to see what the exact return format should be.
	ToMapping() map[string]interface{}
}

ChainConfig contains chain-specific configuration. It is an interface so that implementations can add extra fields as long as they provide data from these methods which are required by the framework.

type ChainMetrics

type ChainMetrics interface {
	SetNewFeedConfigsDetected(numFeeds float64)

	IncSendMessageToKafkaFailed(topic string)
	IncSendMessageToKafkaSucceeded(topic string)
	AddSendMessageToKafkaBytes(bytes float64, topic string)
}

func NewChainMetrics

func NewChainMetrics(chainConfig ChainConfig) ChainMetrics

type Envelope

type Envelope struct {
	// latest transmission details
	ConfigDigest    types.ConfigDigest
	Epoch           uint32
	Round           uint8
	LatestAnswer    *big.Int
	LatestTimestamp time.Time

	// latest contract config
	ContractConfig types.ContractConfig

	// extra
	BlockNumber             uint64
	Transmitter             types.Account
	LinkBalance             *big.Int
	LinkAvailableForPayment *big.Int

	// The "fee coin" is different for each chain.
	JuelsPerFeeCoin   *big.Int
	AggregatorRoundID uint32
}

Envelope contains data that is required from all the chain integrations. Integrators usually create an EnvelopeSource to produce Envelope instances.

type Exporter

type Exporter interface {
	// Export is executed on each update on a monitored feed
	Export(ctx context.Context, data interface{})
	// Cleanup is executed once a monitor for a specific feed is terminated.
	Cleanup(ctx context.Context)
}

Exporter methods can be executed out of order and should be thread safe.

type ExporterFactory

type ExporterFactory interface {
	NewExporter(ExporterParams) (Exporter, error)
}

ExporterFactory is used to create a new exporter for each feed that needs to be monitored.

func NewKafkaExporterFactory

func NewKafkaExporterFactory(
	log Logger,
	producer Producer,
	pipelines []Pipeline,
) (ExporterFactory, error)

NewKafkaExporterFactory produces Kafka exporters which consume, format and publish source outputs to kafka.

func NewPrometheusExporterFactory

func NewPrometheusExporterFactory(
	log Logger,
	metrics Metrics,
) ExporterFactory

type ExporterFactoryMock

type ExporterFactoryMock struct {
	mock.Mock
}

ExporterFactoryMock is an autogenerated mock type for the ExporterFactory type

func NewExporterFactoryMock

func NewExporterFactoryMock(t interface {
	mock.TestingT
	Cleanup(func())
}) *ExporterFactoryMock

NewExporterFactoryMock creates a new instance of ExporterFactoryMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*ExporterFactoryMock) NewExporter

func (_m *ExporterFactoryMock) NewExporter(_a0 ExporterParams) (Exporter, error)

NewExporter provides a mock function with given fields: _a0

type ExporterMock

type ExporterMock struct {
	mock.Mock
}

ExporterMock is an autogenerated mock type for the Exporter type

func NewExporterMock

func NewExporterMock(t interface {
	mock.TestingT
	Cleanup(func())
}) *ExporterMock

NewExporterMock creates a new instance of ExporterMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*ExporterMock) Cleanup

func (_m *ExporterMock) Cleanup(ctx context.Context)

Cleanup provides a mock function with given fields: ctx

func (*ExporterMock) Export

func (_m *ExporterMock) Export(ctx context.Context, data interface{})

Export provides a mock function with given fields: ctx, data

type ExporterParams

type ExporterParams struct {
	ChainConfig ChainConfig
	FeedConfig  FeedConfig
	Nodes       []NodeConfig
}

type FeedConfig

type FeedConfig interface {
	// This functions as a feed identifier.
	GetID() string
	GetName() string
	GetPath() string
	GetSymbol() string
	GetHeartbeatSec() int64
	GetContractType() string
	GetContractStatus() string
	GetContractAddress() string
	GetContractAddressBytes() []byte
	// GetMultiply() returns the multiply parameter of a feed.
	// This is a misnomer kept for historical reasons. Multiply is used as divisor
	// for the big integers read from on-chain - think balances, observations,
	// etc. - into prometheus-friendly float64s.
	GetMultiply() *big.Int
	// ToMapping() is useful when encoding kafka messages.
	ToMapping() map[string]interface{}
}

FeedConfig is the interface for feed configurations extracted from the RDD. Implementation can add more fields as needed, but this subset is required by the framework.

type FeedMetrics

type FeedMetrics interface {
	IncFetchFromSourceFailed(sourceName string)
	IncFetchFromSourceSucceeded(sourceName string)
	ObserveFetchFromSourceDuraction(duration time.Duration, sourceName string)
}

func NewFeedMetrics

func NewFeedMetrics(chainConfig ChainConfig, feedConfig FeedConfig) FeedMetrics

type FeedMonitor

type FeedMonitor interface {
	Run(ctx context.Context)
}

func NewFeedMonitor

func NewFeedMonitor(
	log Logger,
	pollers []Poller,
	exporters []Exporter,
) FeedMonitor

type FeedsParser

type FeedsParser func(buf io.ReadCloser) ([]FeedConfig, error)

FeedParser is the interface for deserializing feed configuration data for each chain integration.

type HTTPServer

type HTTPServer interface {
	Handle(path string, handler http.Handler)
	Run(ctx context.Context)
}

HTTPServer is the HTTP interface exposed by every monitoring. It's used to export metrics to prometheus, to query the node for configurations, etc.

func NewHTTPServer

func NewHTTPServer(baseCtx context.Context, addr string, log Logger) HTTPServer

type Logger

type Logger = logger.Logger

Logger is a type alias for backwards compatibility.

type ManagedFunc

type ManagedFunc func(localCtx context.Context, data RDDData)

type Manager

type Manager interface {
	Run(backgroundCtx context.Context, managed ManagedFunc)
	HTTPHandler() http.Handler
}

Manager restarts the multi-feed monitor whenever the feed configuration list has changed. In order to not be coupled with the MultiFeedMonitor component, it simply runs a function every time the feed configuration has changed. This is hooked up to the MultiFeedMonitor.Run method in the Monitor.

func NewManager

func NewManager(
	log Logger,
	rddPoller Poller,
) Manager

type Mapper

type Mapper func(Envelope, ChainConfig, FeedConfig) (map[string]interface{}, error)

Mapper is an interface for converting Envelopes into data structures that can be encoded in AVRO and sent to Kafka.

type Metrics

type Metrics interface {
	SetHeadTrackerCurrentHead(blockNumber float64, networkName, chainID, networkID string)
	SetFeedContractMetadata(chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol string)
	SetFeedContractLinkBalance(balance float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetLinkAvailableForPayment(amount float64, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetFeedContractTransactionsSucceeded(numSucceeded float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetFeedContractTransactionsFailed(numFailed float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetNodeMetadata(chainID, networkID, networkName, oracleName, sender string)
	// Deprecated: use SetOffchainAggregatorAnswers
	SetOffchainAggregatorAnswersRaw(answer float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetOffchainAggregatorAnswers(answer float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	IncOffchainAggregatorAnswersTotal(contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	// Deprecated: use SetOffchainAggregatorJuelsPerFeeCoin
	SetOffchainAggregatorJuelsPerFeeCoinRaw(juelsPerFeeCoin float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetOffchainAggregatorJuelsPerFeeCoin(juelsPerFeeCoin float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetOffchainAggregatorSubmissionReceivedValues(value float64, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetOffchainAggregatorJuelsPerFeeCoinReceivedValues(value float64, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	SetOffchainAggregatorRoundID(aggregatorRoundID float64, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName string)
	// Cleanup deletes all the metrics
	Cleanup(networkName, networkID, chainID, oracleName, sender, feedName, feedPath, symbol, contractType, contractStatus, contractAddress, feedID string)
	// Exposes the accumulated metrics to HTTP in the prometheus format, ready for scraping.
	HTTPHandler() http.Handler
}

Metrics is a thin interface on top of the prometheus API. As such there should be little logic in the implementation of these methods.

func NewMetrics

func NewMetrics(log Logger) Metrics

type MetricsMock

type MetricsMock struct {
	mock.Mock
}

MetricsMock is an autogenerated mock type for the Metrics type

func NewMetricsMock

func NewMetricsMock(t interface {
	mock.TestingT
	Cleanup(func())
}) *MetricsMock

NewMetricsMock creates a new instance of MetricsMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MetricsMock) Cleanup

func (_m *MetricsMock) Cleanup(networkName string, networkID string, chainID string, oracleName string, sender string, feedName string, feedPath string, symbol string, contractType string, contractStatus string, contractAddress string, feedID string)

Cleanup provides a mock function with given fields: networkName, networkID, chainID, oracleName, sender, feedName, feedPath, symbol, contractType, contractStatus, contractAddress, feedID

func (*MetricsMock) HTTPHandler

func (_m *MetricsMock) HTTPHandler() http.Handler

HTTPHandler provides a mock function with given fields:

func (*MetricsMock) IncOffchainAggregatorAnswersTotal

func (_m *MetricsMock) IncOffchainAggregatorAnswersTotal(contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

IncOffchainAggregatorAnswersTotal provides a mock function with given fields: contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetFeedContractLinkBalance

func (_m *MetricsMock) SetFeedContractLinkBalance(balance float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetFeedContractLinkBalance provides a mock function with given fields: balance, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetFeedContractMetadata

func (_m *MetricsMock) SetFeedContractMetadata(chainID string, contractAddress string, feedID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string, symbol string)

SetFeedContractMetadata provides a mock function with given fields: chainID, contractAddress, feedID, contractStatus, contractType, feedName, feedPath, networkID, networkName, symbol

func (*MetricsMock) SetFeedContractTransactionsFailed

func (_m *MetricsMock) SetFeedContractTransactionsFailed(numFailed float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetFeedContractTransactionsFailed provides a mock function with given fields: numFailed, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetFeedContractTransactionsSucceeded

func (_m *MetricsMock) SetFeedContractTransactionsSucceeded(numSucceeded float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetFeedContractTransactionsSucceeded provides a mock function with given fields: numSucceeded, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetHeadTrackerCurrentHead

func (_m *MetricsMock) SetHeadTrackerCurrentHead(blockNumber float64, networkName string, chainID string, networkID string)

SetHeadTrackerCurrentHead provides a mock function with given fields: blockNumber, networkName, chainID, networkID

func (*MetricsMock) SetLinkAvailableForPayment

func (_m *MetricsMock) SetLinkAvailableForPayment(amount float64, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetLinkAvailableForPayment provides a mock function with given fields: amount, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetNodeMetadata

func (_m *MetricsMock) SetNodeMetadata(chainID string, networkID string, networkName string, oracleName string, sender string)

SetNodeMetadata provides a mock function with given fields: chainID, networkID, networkName, oracleName, sender

func (*MetricsMock) SetOffchainAggregatorAnswerStalled

func (_m *MetricsMock) SetOffchainAggregatorAnswerStalled(isSet bool, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorAnswerStalled provides a mock function with given fields: isSet, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetOffchainAggregatorAnswers

func (_m *MetricsMock) SetOffchainAggregatorAnswers(answer float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorAnswers provides a mock function with given fields: answer, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetOffchainAggregatorAnswersRaw

func (_m *MetricsMock) SetOffchainAggregatorAnswersRaw(answer float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorAnswersRaw provides a mock function with given fields: answer, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetOffchainAggregatorJuelsPerFeeCoin

func (_m *MetricsMock) SetOffchainAggregatorJuelsPerFeeCoin(juelsPerFeeCoin float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorJuelsPerFeeCoin provides a mock function with given fields: juelsPerFeeCoin, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetOffchainAggregatorJuelsPerFeeCoinRaw

func (_m *MetricsMock) SetOffchainAggregatorJuelsPerFeeCoinRaw(juelsPerFeeCoin float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorJuelsPerFeeCoinRaw provides a mock function with given fields: juelsPerFeeCoin, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetOffchainAggregatorJuelsPerFeeCoinReceivedValues

func (_m *MetricsMock) SetOffchainAggregatorJuelsPerFeeCoinReceivedValues(value float64, contractAddress string, feedID string, sender string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorJuelsPerFeeCoinReceivedValues provides a mock function with given fields: value, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetOffchainAggregatorRoundID

func (_m *MetricsMock) SetOffchainAggregatorRoundID(aggregatorRoundID float64, contractAddress string, feedID string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorRoundID provides a mock function with given fields: aggregatorRoundID, contractAddress, feedID, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

func (*MetricsMock) SetOffchainAggregatorSubmissionReceivedValues

func (_m *MetricsMock) SetOffchainAggregatorSubmissionReceivedValues(value float64, contractAddress string, feedID string, sender string, chainID string, contractStatus string, contractType string, feedName string, feedPath string, networkID string, networkName string)

SetOffchainAggregatorSubmissionReceivedValues provides a mock function with given fields: value, contractAddress, feedID, sender, chainID, contractStatus, contractType, feedName, feedPath, networkID, networkName

type Monitor

type Monitor struct {
	RootContext context.Context

	ChainConfig ChainConfig
	Config      config.Config

	Log            Logger
	Producer       Producer
	Metrics        Metrics
	ChainMetrics   ChainMetrics
	SchemaRegistry SchemaRegistry

	SourceFactories   []SourceFactory
	ExporterFactories []ExporterFactory

	RDDSource Source
	RDDPoller Poller

	Manager Manager

	HTTPServer HTTPServer
}

Monitor is the entrypoint for an on-chain monitor integration. Monitors should only be created via NewMonitor()

func NewMonitor

func NewMonitor(
	rootCtx context.Context,
	log Logger,
	chainConfig ChainConfig,
	envelopeSourceFactory SourceFactory,
	txResultsSourceFactory SourceFactory,
	feedsParser FeedsParser,
	nodesParser NodesParser,
) (*Monitor, error)

NewMonitor builds a new Monitor instance using dependency injection. If advanced configurations of the Monitor are required - for instance, adding a custom third party service to send data to - this method should provide a good starting template to do that.

func (Monitor) Run

func (m Monitor) Run()

Run() starts all the goroutines needed by a Monitor. The lifecycle of these routines is controlled by the context passed to the NewMonitor constructor.

type MultiFeedMonitor

type MultiFeedMonitor interface {
	Run(ctx context.Context, data RDDData)
}

MultiFeedMonitor manages the flow of data from multiple sources to multiple exporters for each feed in the configuration.

func NewMultiFeedMonitor

func NewMultiFeedMonitor(
	chainConfig ChainConfig,
	log Logger,

	sourceFactories []SourceFactory,
	exporterFactories []ExporterFactory,

	bufferCapacity uint32,
) MultiFeedMonitor

type NodeConfig

type NodeConfig interface {
	GetName() string
	GetAccount() types.Account
}

NodeConfig is the subset of on-chain node operator's configuration required by the OM framework.

type NodesParser

type NodesParser func(buf io.ReadCloser) ([]NodeConfig, error)

NodesParser extracts multiple nodes' configurations from the configuration server, eg. weiwatchers.com

type Pipeline

type Pipeline struct {
	Topic  string
	Mapper Mapper
	Schema Schema
}

Pipeline represents a succession of transformations on the data coming from a source: source output -> adapt to a map -> encode to AVRO -> send to Kafka

type Poller

type Poller interface {
	Updater // Poller is just another name for updater.
}

Poller implements Updater by periodically invoking a Source's Fetch() method.

func NewSourcePoller

func NewSourcePoller(
	source Source,
	log Logger,
	pollInterval time.Duration,
	fetchTimeout time.Duration,
	bufferCapacity uint32,
) Poller

NewSourcePoller builds Pollers for Sources. If the Source's Fetch() returns an error it will be reported. If it panics, the panic will be recovered and reported as an error and the program will resume operation. If the error is ErrNoUpdate, it will not be reported and the Poller will skip this round.

type Producer

type Producer interface {
	Produce(key, value []byte, topic string) error
}

Producer is an abstraction on top of Kafka to aid with tests.

func NewInstrumentedProducer

func NewInstrumentedProducer(producer Producer, chainMetrics ChainMetrics) Producer

func NewProducer

func NewProducer(ctx context.Context, log Logger, cfg config.Kafka) (Producer, error)

type RDDData

type RDDData struct {
	Feeds []FeedConfig `json:"feeds,omitempty"`
	Nodes []NodeConfig `json:"nodes,omitempty"`
}

type Schema

type Schema interface {
	ID() int
	Version() int
	Subject() string
	Encode(interface{}) ([]byte, error)
	Decode([]byte) (interface{}, error)
}

Schema is an interface for encoding/decoding data structures into the AVRO format. The data structures need to conform with a predefined AVRO schema, encoding will fail otherwise. The schemas are maintained in a central repository called a [schema registry](https://github.com/confluentinc/schema-registry)

type SchemaRegistry

type SchemaRegistry interface {
	// EnsureSchema handles three cases when pushing a schema spec to the SchemaRegistry:
	// 1. when the schema with a given subject does not exist, it will create it.
	// 2. if a schema with the given subject already exists but the spec is different, it will update it and bump the version.
	// 3. if the schema exists and the spec is the same, it will not do anything.
	EnsureSchema(subject, spec string) (Schema, error)
}

func NewSchemaRegistry

func NewSchemaRegistry(cfg config.SchemaRegistry, log Logger) SchemaRegistry

type Source

type Source interface {
	// Fetch must be thread-safe!
	// There is no guarantee on the ordering of Fetch() calls for the same source instance.
	Fetch(context.Context) (interface{}, error)
}

Source is an abstraction for reading data from a remote API, usually a chain RPC endpoint.

func NewFakeRDDSource

func NewFakeRDDSource(minFeeds, maxFeeds uint8) Source

func NewRDDSource

func NewRDDSource(
	feedsURL string,
	feedsParser FeedsParser,
	feedsIgnoreIDs []string,
	nodesURL string,
	nodesParser NodesParser,
	log Logger,
) Source

type SourceFactory

type SourceFactory interface {
	NewSource(chainConfig ChainConfig, feedConfig FeedConfig) (Source, error)
	// GetType should return a namespace for all the source instances produced by this factory.
	GetType() string
}

func NewInstrumentedSourceFactory

func NewInstrumentedSourceFactory(sourceFactory SourceFactory, chainMetrics ChainMetrics) SourceFactory

NewInstrumentedSourceFactory wraps a Source and transparently monitors it.

type SourceFactoryMock

type SourceFactoryMock struct {
	mock.Mock
}

SourceFactoryMock is an autogenerated mock type for the SourceFactory type

func NewSourceFactoryMock

func NewSourceFactoryMock(t interface {
	mock.TestingT
	Cleanup(func())
}) *SourceFactoryMock

NewSourceFactoryMock creates a new instance of SourceFactoryMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*SourceFactoryMock) GetType

func (_m *SourceFactoryMock) GetType() string

GetType provides a mock function with given fields:

func (*SourceFactoryMock) NewSource

func (_m *SourceFactoryMock) NewSource(chainConfig ChainConfig, feedConfig FeedConfig) (Source, error)

NewSource provides a mock function with given fields: chainConfig, feedConfig

type SourceMock

type SourceMock struct {
	mock.Mock
}

SourceMock is an autogenerated mock type for the Source type

func NewSourceMock

func NewSourceMock(t interface {
	mock.TestingT
	Cleanup(func())
}) *SourceMock

NewSourceMock creates a new instance of SourceMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*SourceMock) Fetch

func (_m *SourceMock) Fetch(_a0 context.Context) (interface{}, error)

Fetch provides a mock function with given fields: _a0

type TxResults

type TxResults struct {
	NumSucceeded, NumFailed uint64
}

TxResults counts the number of successful and failed transactions in a predetermined window of time. Integrators usually create an TxResultsSource to produce TxResults instances.

type Updater

type Updater interface {
	// Run should be executed as a goroutine otherwise it will block.
	Run(context.Context)
	// You should never close the channel returned by Updates()!
	// You should always read from the channel returned by Updates() in a
	// select statement with the same context you passed to Run()
	Updates() <-chan interface{}
}

Updater is a generic interface implemented by either polling or subscribing.

Directories

Path Synopsis
Package monitoring contains a small DSL to help write more robust Avro schemas by taking advantage of go's type system.
Package monitoring contains a small DSL to help write more robust Avro schemas by taking advantage of go's type system.
package config parses flags, environment variables and json object to build a Config object that's used throughout the monitor.
package config parses flags, environment variables and json object to build a Config object that's used throughout the monitor.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL