loop

package
v0.0.0-...-47b5856 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 1, 2024 License: MIT Imports: 41 Imported by: 8

README

LOOP Plugins

Local out of process (LOOP) plugins using github.com/hashicorp/go-plugin.

Packages

flowchart
    subgraph chainlink-common/pkg
        loop
        internal[loop/internal]
        pb[loop/internal/pb]
        test[loop/internal/test]

        internal --> pb
        test --> internal
        loop --> internal
        loop --> test
    end
    
    grpc[google.golang.org/grpc]
    hashicorp[hashicorp/go-plugin]

    loop ---> hashicorp
    loop ---> grpc
    test ---> grpc
    internal ---> grpc
    pb ---> grpc
    hashicorp --> grpc

package loop

Public API and hashicorp/go-plugin integration.

package test

Testing utilities.

package internal

GRPC client & server implementations.

package pb

Protocol buffer definitions & generated code.

Communication

GRPC client/server pairs are used to communicated between the host and each plugin. Plugins cannot communicate directly with one another, but the host can proxy a connection between them.

Here are the main components for the case of Median:

sequenceDiagram
    autonumber
    participant relayer as Relayer (plugin)
    participant core as Chainlink (host)
    participant median as Median (plugin)

    Note over core: KeystoreServer
    core->>+relayer: NewRelayer(Config, Keystore)
    Note over relayer: KeystoreClient
    Note over relayer: RelayerServer
    relayer->>-core: Relayer ID 
    Note over core: RelayerClient

    core->>+relayer: NewMedianProvider(RelayArgs, PluginArgs)
    Note over relayer: MedianProviderServer
    relayer->>-core: MedianProvider ID
    Note over core: MedianProvider (Proxy)

    Note over core:  DataSourceServer
    Note over core:  ErrorLogServer

    core->>+median: NewMedianFactory(MedianProvider, DataSource, ErrorLog)
    Note over median: MedianProviderClient
    Note over median: DataSourceClient
    Note over median: ErrorLogClient
    Note over median: MedianFactoryServer
    median->>-core: MedianFactory ID
    Note over core: MedianFactoryClient

    core->>+median: NewReportingPlugin(ReportingPluginConfig)
    Note over median: ReportingPluginServer
    median->>-core: ReportingPlugin ID
    Note over core: ReportingPluginClient

Note: MedianProvider includes multiple component services on the same connection.

sequenceDiagram
    autonumber
    participant relayer as Relayer (plugin)
    participant core as Chainlink (host)
    participant median as Median (plugin)

    core->>+relayer: NewMedianProvider(RelayArgs, PluginArgs)
    Note over relayer: OffchainConfigDigesterServer
    Note over relayer: ContractConfigTrackerServer
    Note over relayer: ContractTransmitterServer
    Note over relayer: ReportCodecServer
    Note over relayer: MedianContractServer
    Note over relayer: OnchainConfigCodecServer
    
    relayer->>-core: MedianProvider ID
    Note over core: MedianProvider (Proxy)
    
    Note over core: OffchainConfigDigesterClient
    Note over core: ContractConfigTrackerClient
    Note over core: ContractTransmitterClient
    
    core->>+median: NewMedianFactory(MedianProvider, DataSource, ErrorLog)
    Note over median: ReportCodecClient
    Note over median: MedianContractClient
    Note over median: OnchainConfigCodecClient

Auto-Recovery

The pluginService type contains reusable automatic recovery code.

type pluginService[P grpcPlugin, S services.Service] struct

Each plugin implements their own interface (Relayer, Median, etc.) with a new type that also embeds a pluginService. This new service type implements the original interface, but internally manages re-starting and re-connecting to the plugin as an implementation detail. So there is one long-lived instance of each plugin service and client, and whenever the plugin process (along with its server) crashes, the service restarts the plugin process and re-establishes the client's connection to the new server.

sequenceDiagram
    participant core as Chainlink (host)
    participant relayer as Process 1 (plugin)
    participant relayer2 as Process 2 (plugin)
    
    Note over core: RelayerService
    
    core->>+relayer: exec.Cmd
    core->>relayer: NewRelayerServer
    Note over relayer: RelayerServer
    relayer->>core: RelayerID
    Note over core: RelayerClient.init()
    
    loop
        core->>relayer: Alive?
        relayer->>core: Yes
    end
    core->>relayer: Alive?
    relayer->>-core: No

    core->>+relayer2: exec.Cmd
    core->>relayer2: NewRelayerServer
    Note over relayer2: RelayerServer
    relayer2->>core: RelayerID
    Note over core: RelayerClient.update()

    loop
        core->>relayer2: Alive?
        relayer2->>-core: Yes
    end

Documentation

Index

Constants

View Source
const PluginMedianName = "median"

PluginMedianName is the name for types.PluginMedian/[NewGRPCPluginMedian].

View Source
const PluginRelayerName = "relayer"

PluginRelayerName is the name for types.PluginRelayer/[NewGRPCPluginRelayer].

Variables

View Source
var ErrPluginUnavailable = internal.ErrPluginUnavailable

Functions

func HCLogLogger

func HCLogLogger(l logger.Logger) hclog.Logger

HCLogLogger returns an hclog.Logger backed by the given logger.Logger.

func NewLogger

func NewLogger() (logger.Logger, error)

NewLogger returns a new logger.Logger configured to encode hclog compatible JSON.

func PluginMedianHandshakeConfig

func PluginMedianHandshakeConfig() plugin.HandshakeConfig

func PluginRelayerHandshakeConfig

func PluginRelayerHandshakeConfig() plugin.HandshakeConfig

func RegisterStandAloneProvider

func RegisterStandAloneProvider(s *grpc.Server, p types.PluginProvider, pType types.OCR2PluginType) error

RegisterStandAloneProvider register the servers needed for a plugin provider, this is a workaround to test the Node API medianpoc on EVM until the EVM relayer is loopifyed

func SetupTracing

func SetupTracing(config TracingConfig) (err error)

SetupTracing initializes open telemetry with the provided config. It sets the global trace provider and opens a connection to the configured collector.

Types

type BrokerConfig

type BrokerConfig = internal.BrokerConfig

type ContextValues

type ContextValues struct {
	JobID   any
	JobName any

	ContractID    any
	FeedID        any
	TransmitterID any
}

ContextValues is a helper for passing values via a context.Context.

func (*ContextValues) Args

func (v *ContextValues) Args() (a []any)

Args returns a slice of args to pass to logger.Logger.With.

func (*ContextValues) ContextWithValues

func (v *ContextValues) ContextWithValues(ctx context.Context) context.Context

ContextWithValues returns a context.Context with values set from v.

func (*ContextValues) SetValues

func (v *ContextValues) SetValues(ctx context.Context)

SetValues sets v to values from the ctx.

type EnvConfig

type EnvConfig struct {
	PrometheusPort int

	TracingEnabled         bool
	TracingCollectorTarget string
	TracingSamplingRatio   float64
	TracingTLSCertPath     string
	TracingAttributes      map[string]string
}

EnvConfig is the configuration between the application and the LOOP executable. The values are fully resolved and static and passed via the environment.

func (*EnvConfig) AsCmdEnv

func (e *EnvConfig) AsCmdEnv() (env []string)

AsCmdEnv returns a slice of environment variable key/value pairs for an exec.Cmd.

type ErrorLog

type ErrorLog = types.ErrorLog

Deprecated

type GRPCOpts

type GRPCOpts = internal.GRPCOpts

func NewGRPCOpts

func NewGRPCOpts(registerer prometheus.Registerer) GRPCOpts

NewGRPCOpts initializes open telemetry and returns GRPCOpts with telemetry interceptors. It is called from the host and each plugin - intended as there is bidirectional communication

type GRPCPluginMedian

type GRPCPluginMedian struct {
	plugin.NetRPCUnsupportedPlugin

	BrokerConfig

	PluginServer types.PluginMedian
	// contains filtered or unexported fields
}

func (*GRPCPluginMedian) ClientConfig

func (p *GRPCPluginMedian) ClientConfig() *plugin.ClientConfig

func (*GRPCPluginMedian) GRPCClient

func (p *GRPCPluginMedian) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error)

GRPCClient implements plugin.GRPCPlugin and returns the pluginClient types.PluginMedian, updated with the new broker and conn.

func (*GRPCPluginMedian) GRPCServer

func (p *GRPCPluginMedian) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error

type GRPCPluginRelayer

type GRPCPluginRelayer struct {
	plugin.NetRPCUnsupportedPlugin

	BrokerConfig

	PluginServer PluginRelayer
	// contains filtered or unexported fields
}

GRPCPluginRelayer implements plugin.GRPCPlugin for types.PluginRelayer.

func (*GRPCPluginRelayer) ClientConfig

func (p *GRPCPluginRelayer) ClientConfig() *plugin.ClientConfig

func (*GRPCPluginRelayer) GRPCClient

func (p *GRPCPluginRelayer) GRPCClient(_ context.Context, broker *plugin.GRPCBroker, conn *grpc.ClientConn) (interface{}, error)

GRPCClient implements plugin.GRPCPlugin and returns the pluginClient types.PluginRelayer, updated with the new broker and conn.

func (*GRPCPluginRelayer) GRPCServer

func (p *GRPCPluginRelayer) GRPCServer(broker *plugin.GRPCBroker, server *grpc.Server) error

type Keystore

type Keystore = types.Keystore

Deprecated

type MedianService

MedianService is a types.Service that maintains an internal types.PluginMedian.

func NewMedianService

func NewMedianService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, provider types.MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog types.ErrorLog) *MedianService

NewMedianService returns a new *MedianService. cmd must return a new exec.Cmd each time it is called.

type Plugin

type Plugin struct {
	Logger logger.Logger
	// contains filtered or unexported fields
}

Plugin is a base layer for plugins to easily manage sub-[types.Service]s. Useful for implementing PluginRelayer and PluginMedian.

func (*Plugin) Close

func (p *Plugin) Close() (err error)

func (*Plugin) HealthReport

func (p *Plugin) HealthReport() map[string]error

func (*Plugin) Name

func (p *Plugin) Name() string

func (*Plugin) Ready

func (p *Plugin) Ready() error

func (*Plugin) SubService

func (p *Plugin) SubService(s services.Service)

type PluginMedian

type PluginMedian = types.PluginMedian

Deprecated

type PluginRelayer

type PluginRelayer = internal.PluginRelayer

type PromServer

type PromServer struct {
	// contains filtered or unexported fields
}

func NewPromServer

func NewPromServer(port int, lggr logger.Logger) *PromServer

func (*PromServer) Close

func (p *PromServer) Close() error

Close shuts down the underlying HTTP server. See http.Server.Close for details

func (*PromServer) Name

func (p *PromServer) Name() string

Name of the server

func (*PromServer) Start

func (p *PromServer) Start() error

Start starts HTTP server on specified port to handle metrics requests

type PromServerOpts

type PromServerOpts struct {
	Handler http.Handler
}

func (PromServerOpts) New

func (o PromServerOpts) New(port int, lggr logger.Logger) *PromServer

type Relayer

type Relayer = internal.Relayer

type RelayerAdapter

type RelayerAdapter struct {
	types.Relayer
	RelayerExt
}

RelayerAdapter adapts a types.Relayer and RelayerExt to implement Relayer.

func (*RelayerAdapter) Close

func (r *RelayerAdapter) Close() error

func (*RelayerAdapter) HealthReport

func (r *RelayerAdapter) HealthReport() map[string]error

func (*RelayerAdapter) Name

func (r *RelayerAdapter) Name() string

func (*RelayerAdapter) NewPluginProvider

func (r *RelayerAdapter) NewPluginProvider(ctx context.Context, rargs types.RelayArgs, pargs types.PluginArgs) (types.PluginProvider, error)

func (*RelayerAdapter) Ready

func (r *RelayerAdapter) Ready() (err error)

func (*RelayerAdapter) Start

func (r *RelayerAdapter) Start(ctx context.Context) error

type RelayerExt

type RelayerExt interface {
	types.ChainService
	ID() string
}

RelayerExt is a subset of loop.Relayer for adapting types.Relayer, typically with a Chain. See RelayerAdapter.

type RelayerService

type RelayerService struct {
	internal.PluginService[*GRPCPluginRelayer, Relayer]
}

RelayerService is a types.Service that maintains an internal Relayer.

func NewRelayerService

func NewRelayerService(lggr logger.Logger, grpcOpts GRPCOpts, cmd func() *exec.Cmd, config string, keystore types.Keystore) *RelayerService

NewRelayerService returns a new *RelayerService. cmd must return a new exec.Cmd each time it is called.

func (*RelayerService) GetChainStatus

func (r *RelayerService) GetChainStatus(ctx context.Context) (types.ChainStatus, error)

func (*RelayerService) ListNodeStatuses

func (r *RelayerService) ListNodeStatuses(ctx context.Context, pageSize int32, pageToken string) (nodes []types.NodeStatus, nextPageToken string, total int, err error)

func (*RelayerService) NewPluginProvider

func (r *RelayerService) NewPluginProvider(ctx context.Context, rargs types.RelayArgs, pargs types.PluginArgs) (types.PluginProvider, error)

func (*RelayerService) Transact

func (r *RelayerService) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error

type ReportingPluginFactory

type ReportingPluginFactory = types.ReportingPluginFactory

Deprecated

type Server

type Server struct {
	GRPCOpts GRPCOpts
	Logger   logger.SugaredLogger
	// contains filtered or unexported fields
}

Server holds common plugin server fields.

func MustNewStartedServer

func MustNewStartedServer(loggerName string) *Server

MustNewStartedServer returns a new started Server like NewStartedServer, but logs and exits in the event of error. The caller is responsible for calling Server.Stop().

func NewStartedServer

func NewStartedServer(loggerName string) (*Server, error)

NewStartedServer returns a started Server. The caller is responsible for calling Server.Stop().

func (*Server) MustRegister

func (s *Server) MustRegister(c services.HealthReporter)

MustRegister registers the HealthReporter with services.HealthChecker, or exits upon failure.

func (*Server) Register

func (s *Server) Register(c services.HealthReporter) error

func (*Server) Stop

func (s *Server) Stop()

Stop closes resources and flushes logs.

type TracingConfig

type TracingConfig struct {
	// NodeAttributes are the attributes to attach to traces.
	NodeAttributes map[string]string

	// Enables tracing; requires a collector to be provided
	Enabled bool

	// Collector is the address of the OTEL collector to send traces to.
	CollectorTarget string

	// SamplingRatio is the ratio of traces to sample. 1.0 means sample all traces.
	SamplingRatio float64

	// TLSCertPath is the path to the TLS certificate to use when connecting to the collector.
	TLSCertPath string

	// OnDialError is called when the dialer fails, providing an opportunity to log.
	OnDialError func(error)
}

Directories

Path Synopsis
adapters
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL