services

package
v0.10.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: MIT Imports: 38 Imported by: 4

Documentation

Overview

Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.

Application

The Application is the main component used for starting and stopping the Chainlink node.

JobRunner

The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.

JobSubscriber

The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.

Scheduler

The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExpectedRecurringScheduleJobError added in v0.8.2

func ExpectedRecurringScheduleJobError(err error) bool

func NewPromReporter added in v0.9.6

func NewPromReporter(db *sql.DB, opts ...PrometheusBackend) *promReporter

func NewRun

func NewRun(
	job *models.JobSpec,
	initiator *models.Initiator,
	currentHeight *big.Int,
	runRequest *models.RunRequest,
	config orm.ConfigReader,
	orm *orm.ORM,
	now time.Time) (*models.JobRun, []*adapters.PipelineAdapter)

NewRun returns a complete run from a JobSpec

func NewSessionReaper added in v0.10.8

func NewSessionReaper(store *store.Store) utils.SleeperTask

NewSessionReaper creates a reaper that cleans stale sessions from the store.

func ProcessLogRequest added in v0.10.7

func ProcessLogRequest(runManager RunManager, le models.LogRequest)

ReceiveLogRequest parses the log and runs the job it indicated by its GetJobSpecID method

func ValidateBridgeType

func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name or invalid url

func ValidateBridgeTypeNotExist added in v0.8.2

func ValidateBridgeTypeNotExist(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeTypeNotExist checks that a bridge has not already been created

func ValidateExternalInitiator added in v0.6.6

func ValidateExternalInitiator(
	exi *models.ExternalInitiatorRequest,
	store *store.Store,
) error

ValidateExternalInitiator checks whether External Initiator parameters are safe for processing.

func ValidateInitiator

func ValidateInitiator(i models.Initiator, j models.JobSpec, store *store.Store) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob

func ValidateJob(j models.JobSpec, store *store.Store, keyStore *keystore.Master) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

func ValidateRun added in v0.8.2

func ValidateRun(run *models.JobRun, contractCost *assets.Link)

ValidateRun ensures that a run's initial preconditions have been met

func ValidateServiceAgreement

func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store, keyStore *keystore.Master) error

ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.

Types

type BalanceMonitor added in v0.8.11

type BalanceMonitor interface {
	httypes.HeadTrackable
	GetEthBalance(gethCommon.Address) *assets.Eth
	service.Service
}

BalanceMonitor checks the balance for each key on every new head

func NewBalanceMonitor added in v0.8.11

func NewBalanceMonitor(store *store.Store, ethKeyStore *keystore.Eth) BalanceMonitor

NewBalanceMonitor returns a new balanceMonitor

type Cron

type Cron interface {
	Start()
	Stop() context.Context
	AddFunc(string, func()) (cron.EntryID, error)
}

type InitiatorSubscription

type InitiatorSubscription struct {
	Initiator models.Initiator
	// contains filtered or unexported fields
}

InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.

func NewInitiatorSubscription

func NewInitiatorSubscription(
	initr models.Initiator,
	client eth.Client,
	runManager RunManager,
	filter ethereum.FilterQuery,
	backfillBatchSize uint32,
	callback func(RunManager, models.LogRequest),
) (*InitiatorSubscription, error)

NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.

func (*InitiatorSubscription) Start added in v0.10.7

func (sub *InitiatorSubscription) Start()

func (*InitiatorSubscription) Unsubscribe added in v0.10.7

func (sub *InitiatorSubscription) Unsubscribe()

Unsubscribe closes channels and cleans up resources.

type JobSubscriber

type JobSubscriber interface {
	httypes.HeadTrackable
	AddJob(job models.JobSpec, bn *models.Head) error
	RemoveJob(ID models.JobID) error
	Jobs() []models.JobSpec
	service.Service
}

JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.

func NewJobSubscriber

func NewJobSubscriber(store *store.Store, runManager RunManager) JobSubscriber

NewJobSubscriber returns a new job subscriber.

type JobSubscription

type JobSubscription struct {
	Job models.JobSpec
	// contains filtered or unexported fields
}

JobSubscription listens to event logs being pushed from the Ethereum Node to a job.

func StartJobSubscription

func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store, runManager RunManager) (JobSubscription, error)

StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.

func (JobSubscription) Unsubscribe

func (js JobSubscription) Unsubscribe()

Unsubscribe stops the subscription and cleans up associated resources.

type NullBalanceMonitor added in v0.9.3

type NullBalanceMonitor struct{}

func (*NullBalanceMonitor) Close added in v0.10.8

func (*NullBalanceMonitor) Close() error

func (*NullBalanceMonitor) Connect added in v0.9.3

func (*NullBalanceMonitor) Connect(head *models.Head) error

func (*NullBalanceMonitor) Disconnect added in v0.9.3

func (*NullBalanceMonitor) Disconnect()

func (*NullBalanceMonitor) GetEthBalance added in v0.9.3

func (*NullBalanceMonitor) GetEthBalance(gethCommon.Address) *assets.Eth

func (*NullBalanceMonitor) Healthy added in v0.10.8

func (*NullBalanceMonitor) Healthy() error

func (*NullBalanceMonitor) OnNewLongestChain added in v0.9.3

func (*NullBalanceMonitor) OnNewLongestChain(ctx context.Context, head models.Head)

func (*NullBalanceMonitor) Ready added in v0.10.8

func (*NullBalanceMonitor) Ready() error

func (*NullBalanceMonitor) Start added in v0.10.8

func (*NullBalanceMonitor) Start() error

type NullJobSubscriber added in v0.10.7

type NullJobSubscriber struct{}

NullJobSubscriber implements Null pattern for JobSubscriber interface

func (NullJobSubscriber) AddJob added in v0.10.7

func (NullJobSubscriber) AddJob(job models.JobSpec, bn *models.Head) error

func (NullJobSubscriber) Close added in v0.10.8

func (NullJobSubscriber) Close() error

func (NullJobSubscriber) Connect added in v0.10.7

func (NullJobSubscriber) Connect(head *models.Head) error

func (NullJobSubscriber) Healthy added in v0.10.8

func (NullJobSubscriber) Healthy() error

func (NullJobSubscriber) Jobs added in v0.10.7

func (NullJobSubscriber) Jobs() (j []models.JobSpec)

func (NullJobSubscriber) OnNewLongestChain added in v0.10.7

func (NullJobSubscriber) OnNewLongestChain(ctx context.Context, head models.Head)

func (NullJobSubscriber) Ready added in v0.10.8

func (NullJobSubscriber) Ready() error

func (NullJobSubscriber) RemoveJob added in v0.10.7

func (NullJobSubscriber) RemoveJob(ID models.JobID) error

func (NullJobSubscriber) Start added in v0.10.8

func (NullJobSubscriber) Start() error

type NullRunExecutor added in v0.10.7

type NullRunExecutor struct{}

NullRunExecutor implements Null pattern for RunExecutor interface

func (NullRunExecutor) Execute added in v0.10.7

func (NullRunExecutor) Execute(uuid.UUID) error

type NullRunManager added in v0.10.7

type NullRunManager struct{}

NullRunManager implements Null pattern for RunManager interface

func (NullRunManager) Cancel added in v0.10.7

func (NullRunManager) Cancel(runID uuid.UUID) (*models.JobRun, error)

func (NullRunManager) Create added in v0.10.7

func (NullRunManager) Create(jobSpecID models.JobID, initiator *models.Initiator, creationHeight *big.Int, runRequest *models.RunRequest) (*models.JobRun, error)

func (NullRunManager) CreateErrored added in v0.10.7

func (NullRunManager) CreateErrored(jobSpecID models.JobID, initiator models.Initiator, err error) (*models.JobRun, error)

func (NullRunManager) ResumeAllInProgress added in v0.10.7

func (NullRunManager) ResumeAllInProgress() error

func (NullRunManager) ResumeAllPendingConnection added in v0.10.7

func (NullRunManager) ResumeAllPendingConnection() error

func (NullRunManager) ResumeAllPendingNextBlock added in v0.10.7

func (NullRunManager) ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error

func (NullRunManager) ResumePendingBridge added in v0.10.7

func (NullRunManager) ResumePendingBridge(runID uuid.UUID, input models.BridgeRunResult) error

type NullRunQueue added in v0.10.7

type NullRunQueue struct{}

NullRunQueue implements Null pattern for RunQueue interface

func (NullRunQueue) Close added in v0.10.8

func (NullRunQueue) Close() error

func (NullRunQueue) Healthy added in v0.10.8

func (NullRunQueue) Healthy() error

func (NullRunQueue) Ready added in v0.10.8

func (NullRunQueue) Ready() error

func (NullRunQueue) Run added in v0.10.7

func (NullRunQueue) Run(uuid.UUID)

func (NullRunQueue) Start added in v0.10.7

func (NullRunQueue) Start() error

func (NullRunQueue) WorkerCount added in v0.10.7

func (NullRunQueue) WorkerCount() int

type OneTime

type OneTime struct {
	Store      *store.Store
	Clock      utils.Afterer
	RunManager RunManager
	// contains filtered or unexported fields
}

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt

func (ot *OneTime) RunJobAt(initiator models.Initiator, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type PrometheusBackend added in v0.9.6

type PrometheusBackend interface {
	SetUnconfirmedTransactions(int64)
	SetMaxUnconfirmedAge(float64)
	SetMaxUnconfirmedBlocks(int64)
	SetPipelineRunsQueued(n int)
	SetPipelineTaskRunsQueued(n int)
}

type Recurring

type Recurring struct {
	Cron  Cron
	Clock utils.Nower
	// contains filtered or unexported fields
}

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring

func NewRecurring(runManager RunManager) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type RecurringScheduleJobError

type RecurringScheduleJobError struct {
	// contains filtered or unexported fields
}

RecurringScheduleJobError contains the field for the error message.

func (RecurringScheduleJobError) Error

func (err RecurringScheduleJobError) Error() string

Error returns the error message for the run.

type RunExecutor added in v0.8.2

type RunExecutor interface {
	Execute(uuid.UUID) error
}

RunExecutor handles the actual running of the job tasks

func NewRunExecutor added in v0.8.2

func NewRunExecutor(store *store.Store, keyStore *keystore.Master, statsPusher synchronization.StatsPusher) RunExecutor

NewRunExecutor initializes a RunExecutor.

type RunManager added in v0.8.2

type RunManager interface {
	Create(
		jobSpecID models.JobID,
		initiator *models.Initiator,
		creationHeight *big.Int,
		runRequest *models.RunRequest) (*models.JobRun, error)
	CreateErrored(
		jobSpecID models.JobID,
		initiator models.Initiator,
		err error) (*models.JobRun, error)
	ResumePendingBridge(
		runID uuid.UUID,
		input models.BridgeRunResult) error
	Cancel(runID uuid.UUID) (*models.JobRun, error)

	ResumeAllInProgress() error
	ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error
	ResumeAllPendingConnection() error
}

RunManager supplies methods for queueing, resuming and cancelling jobs in the RunQueue

func NewRunManager added in v0.8.2

func NewRunManager(
	runQueue RunQueue,
	config orm.ConfigReader,
	orm *orm.ORM,
	statsPusher synchronization.StatsPusher,
	clock utils.AfterNower) RunManager

NewRunManager returns a new job manager

type RunQueue added in v0.8.2

type RunQueue interface {
	service.Service

	Run(uuid.UUID)

	WorkerCount() int
}

RunQueue safely handles coordinating job runs.

func NewRunQueue added in v0.8.2

func NewRunQueue(runExecutor RunExecutor) RunQueue

NewRunQueue initializes a RunQueue.

type Scheduler

type Scheduler struct {
	Recurring *Recurring
	OneTime   *OneTime

	utils.StartStopOnce
	// contains filtered or unexported fields
}

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler

func NewScheduler(store *store.Store, runManager RunManager) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has started.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop

func (s *Scheduler) Stop() error

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

Directories

Path Synopsis
eth
job
log
signatures
cryptotest
package cryptotest provides convenience functions for kyber-based APIs.
package cryptotest provides convenience functions for kyber-based APIs.
ethdss
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package ethdss implements the Distributed Schnorr Signature protocol from the ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
ethschnorr
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package ethschnorr implements a version of the Schnorr signature which is ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
secp256k1
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
Package secp256k1 is an implementation of the kyber.{Group,Point,Scalar} ////////////////////////////////////////////////////////////////////////////// XXX: Do not use in production until this code has been audited.
vrf

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL