Back to

Package services

Latest Go to latest
Published: Jul 27, 2020 | License: MIT | Module:


Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.


The Application is the main component used for starting and stopping the Chainlink node.


The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.


The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.


The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.


Package Files

func ExpectedRecurringScheduleJobError

func ExpectedRecurringScheduleJobError(err error) bool

func NewRun

func NewRun(
	job *models.JobSpec,
	initiator *models.Initiator,
	currentHeight *big.Int,
	runRequest *models.RunRequest,
	config orm.ConfigReader,
	orm *orm.ORM,
	now time.Time) (*models.JobRun, []*adapters.PipelineAdapter)

NewRun returns a complete run from a JobSpec

func ReceiveLogRequest

func ReceiveLogRequest(runManager RunManager, le models.LogRequest)

ReceiveLogRequest parses the log and runs the job it indicated by its GetJobSpecID method

func ValidateBridgeType

func ValidateBridgeType(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeType checks that the bridge type doesn't have a duplicate or invalid name or invalid url

func ValidateBridgeTypeNotExist

func ValidateBridgeTypeNotExist(bt *models.BridgeTypeRequest, store *store.Store) error

ValidateBridgeTypeNotExist checks that a bridge has not already been created

func ValidateExternalInitiator

func ValidateExternalInitiator(
	exi *models.ExternalInitiatorRequest,
	store *store.Store,
) error

ValidateExternalInitiator checks whether External Initiator parameters are safe for processing.

func ValidateInitiator

func ValidateInitiator(i models.Initiator, j models.JobSpec, store *store.Store) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob

func ValidateJob(j models.JobSpec, store *store.Store) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

func ValidateRun

func ValidateRun(run *models.JobRun, contractCost *assets.Link)

ValidateRun ensures that a run's initial preconditions have been met

func ValidateServiceAgreement

func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error

ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.

type BalanceMonitor

type BalanceMonitor interface {
	GetEthBalance(gethCommon.Address) *assets.Eth

BalanceMonitor checks the balance for each key on every new head

func NewBalanceMonitor

func NewBalanceMonitor(store *store.Store, gethClientWrapper store.GethClientWrapper) BalanceMonitor

NewBalanceMonitor returns a new balanceMonitor

type Cron

type Cron interface {
	Stop() context.Context
	AddFunc(string, func()) (cron.EntryID, error)

type GasUpdater

type GasUpdater interface {
	RollingBlockHistory() []models.Block

GasUpdater listens for new heads and updates the base gas price dynamically based on the configured percentile of gas prices in that block

func NewGasUpdater

func NewGasUpdater(store *store.Store) GasUpdater

NewGasUpdater returns a new gas updater.

type HeadTracker

type HeadTracker struct {
	// contains filtered or unexported fields

HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.

func NewHeadTracker

func NewHeadTracker(store *strpkg.Store, callbacks []strpkg.HeadTrackable, sleepers ...utils.Sleeper) *HeadTracker

NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.

func (*HeadTracker) Connected

func (ht *HeadTracker) Connected() bool

Connected returns whether or not this HeadTracker is connected.

func (*HeadTracker) GetChainWithBackfill

func (ht *HeadTracker) GetChainWithBackfill(ctx context.Context, head models.Head, depth uint) (models.Head, error)

GetChainWithBackfill returns a chain of the given length, backfilling any heads that may be missing from the database

func (*HeadTracker) HighestSeenHead

func (ht *HeadTracker) HighestSeenHead() *models.Head

HighestSeenHead returns the block header with the highest number that has been seen, or nil

func (*HeadTracker) Save

func (ht *HeadTracker) Save(h models.Head) error

Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.

func (*HeadTracker) Start

func (ht *HeadTracker) Start() error

Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.

func (*HeadTracker) Stop

func (ht *HeadTracker) Stop() error

Stop unsubscribes all connections and fires Disconnect.

type InitiatorSubscription

type InitiatorSubscription struct {

	Initiator models.Initiator
	// contains filtered or unexported fields

InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.

func NewInitiatorSubscription

func NewInitiatorSubscription(
	initr models.Initiator,
	client eth.Client,
	runManager RunManager,
	nextHead *big.Int,
	callback func(RunManager, models.LogRequest),
) (InitiatorSubscription, error)

NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.

type JobSubscriber

type JobSubscriber interface {
	AddJob(job models.JobSpec, bn *models.Head) error
	RemoveJob(ID *models.ID) error
	Jobs() []models.JobSpec
	Stop() error

JobSubscriber listens for push notifications of event logs from the ethereum node's websocket for specific jobs by subscribing to ethLogs.

func NewJobSubscriber

func NewJobSubscriber(store *store.Store, runManager RunManager) JobSubscriber

NewJobSubscriber returns a new job subscriber.

type JobSubscription

type JobSubscription struct {
	Job models.JobSpec
	// contains filtered or unexported fields

JobSubscription listens to event logs being pushed from the Ethereum Node to a job.

func StartJobSubscription

func StartJobSubscription(job models.JobSpec, head *models.Head, store *strpkg.Store, runManager RunManager) (JobSubscription, error)

StartJobSubscription constructs a JobSubscription which listens for and tracks event logs corresponding to the specified job. Ignores any errors if there is at least one successful subscription to an initiator log.

func (JobSubscription) Unsubscribe

func (js JobSubscription) Unsubscribe()

Unsubscribe stops the subscription and cleans up associated resources.

type ManagedSubscription

type ManagedSubscription struct {
	// contains filtered or unexported fields

ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.

func NewManagedSubscription

func NewManagedSubscription(
	logSubscriber eth.LogSubscriber,
	filter ethereum.FilterQuery,
	callback func(models.Log),
) (*ManagedSubscription, error)

NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.

func (ManagedSubscription) Unsubscribe

func (sub ManagedSubscription) Unsubscribe()

Unsubscribe closes channels and cleans up resources.

type OneTime

type OneTime struct {
	Store      *store.Store
	Clock      utils.Afterer
	RunManager RunManager
	// contains filtered or unexported fields

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt

func (ot *OneTime) RunJobAt(initiator models.Initiator, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type RawHead

type RawHead struct {
	Number     string
	Hash       string
	ParentHash string
	Timestamp  string

type Recurring

type Recurring struct {
	Cron  Cron
	Clock utils.Nower
	// contains filtered or unexported fields

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring

func NewRecurring(runManager RunManager) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type RecurringScheduleJobError

type RecurringScheduleJobError struct {
	// contains filtered or unexported fields

RecurringScheduleJobError contains the field for the error message.

func (RecurringScheduleJobError) Error

func (err RecurringScheduleJobError) Error() string

Error returns the error message for the run.

type RunExecutor

type RunExecutor interface {
	Execute(*models.ID) error

RunExecutor handles the actual running of the job tasks

func NewRunExecutor

func NewRunExecutor(store *store.Store, statsPusher synchronization.StatsPusher) RunExecutor

NewRunExecutor initializes a RunExecutor.

type RunManager

type RunManager interface {
		jobSpecID *models.ID,
		initiator *models.Initiator,
		creationHeight *big.Int,
		runRequest *models.RunRequest) (*models.JobRun, error)
		jobSpecID *models.ID,
		initiator models.Initiator,
		err error) (*models.JobRun, error)
		runID *models.ID,
		input models.BridgeRunResult) error
	Cancel(runID *models.ID) (*models.JobRun, error)

	ResumeAllInProgress() error
	ResumeAllPendingNextBlock(currentBlockHeight *big.Int) error
	ResumeAllPendingConnection() error

RunManager supplies methods for queueing, resuming and cancelling jobs in the RunQueue

func NewRunManager

func NewRunManager(
	runQueue RunQueue,
	config orm.ConfigReader,
	orm *orm.ORM,
	statsPusher synchronization.StatsPusher,
	txManager store.TxManager,
	clock utils.AfterNower) RunManager

NewRunManager returns a new job manager

type RunQueue

type RunQueue interface {
	Start() error

	WorkerCount() int

RunQueue safely handles coordinating job runs.

func NewRunQueue

func NewRunQueue(runExecutor RunExecutor) RunQueue

NewRunQueue initializes a RunQueue.

type Scheduler

type Scheduler struct {
	Recurring *Recurring
	OneTime   *OneTime
	// contains filtered or unexported fields

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler

func NewScheduler(store *store.Store, runManager RunManager) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

type SleeperTask

type SleeperTask interface {
	Stop() error

SleeperTask represents a task that waits in the background to process some work.

func NewSleeperTask

func NewSleeperTask(worker Worker) SleeperTask

NewSleeperTask takes a worker and returns a SleeperTask.

SleeperTask is guaranteed to call Work on the worker at least once for every WakeUp call. If the Worker is busy when WakeUp is called, the Worker will be called again immediately after it is finished. For this reason you should take care to make sure that Worker is idempotent. WakeUp does not block.

func NewStoreReaper

func NewStoreReaper(store *store.Store) SleeperTask

NewStoreReaper creates a reaper that cleans stale objects from the store.

type Unsubscriber

type Unsubscriber interface {

Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.

type Worker

type Worker interface {

Worker is a simple interface that represents some work to do repeatedly

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier