services

package
v0.0.0-...-5cb58b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2018 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package services contain the key components of the Chainlink node. This includes the Application, JobRunner, LogListener, and Scheduler.

Application

The Application is the main component used for starting and stopping the Chainlink node.

JobRunner

The JobRunner keeps track of Runs within a Job and ensures that they're executed in order. Within each Run, the tasks are also executed from the JobRunner.

JobSubscriber

The JobSubscriber coordinates running job events with the EventLog in the Store, and also subscribes to the given address on the Ethereum blockchain.

Scheduler

The Scheduler ensures that recurring events are executed according to their schedule, and one-time events occur only when the specified time has passed.

Index

Constants

View Source
const (
	RunLogTopicSignature = iota
	RunLogTopicInternalID
	RunLogTopicJobID
	RunLogTopicAmount
)

Descriptive indices of a RunLog's Topic array

View Source
const OracleFulfillmentFunctionID = "0x76005c26"

OracleFulfillmentFunctionID is the function id of the oracle fulfillment method used by EthTx: bytes4(keccak256("fulfillData(uint256,bytes32)")) Kept in sync with solidity/contracts/Oracle.sol

Variables

View Source
var RunLogTopic = common.HexToHash("0x3fab86a1207bdcfe3976d0d9df25f263d45ae8d381a60960559771a2b223974d")

RunLogTopic is the signature for the RunRequest(...) event which Chainlink RunLog initiators watch for. See https://github.com/smartcontractkit/chainlink/blob/master/solidity/contracts/Oracle.sol If updating this, be sure to update the truffle suite's "expected event signature" test.

Functions

func BeginRun

func BeginRun(
	job models.JobSpec,
	initr models.Initiator,
	input models.RunResult,
	store *store.Store,
) (models.JobRun, error)

BeginRun creates a new run if the job is valid and starts the job.

func BeginRunAtBlock

func BeginRunAtBlock(
	job models.JobSpec,
	initr models.Initiator,
	input models.RunResult,
	store *store.Store,
	bn *models.IndexableBlockNumber,
) (models.JobRun, error)

BeginRunAtBlock builds and executes a new run if the job is valid with the block number to determine if tasks should be resumed.

func BuildRun

func BuildRun(job models.JobSpec, i models.Initiator, store *store.Store) (models.JobRun, error)

BuildRun checks to ensure the given job has not started or ended before creating a new run for the job.

func ExecuteRun

func ExecuteRun(jr models.JobRun, store *store.Store, overrides models.RunResult) (models.JobRun, error)

ExecuteRun calls ExecuteRunAtBlock without an IndexableBlockNumber

func ExecuteRunAtBlock

func ExecuteRunAtBlock(
	jr models.JobRun,
	store *store.Store,
	overrides models.RunResult,
	bn *models.IndexableBlockNumber,
) (models.JobRun, error)

ExecuteRunAtBlock starts the job and executes task runs within that job in the order defined in the run for as long as they do not return errors. Results are saved in the store (db).

func NewInitiatorFilterQuery

func NewInitiatorFilterQuery(
	initr models.Initiator,
	head *models.IndexableBlockNumber,
	topics [][]common.Hash,
) ethereum.FilterQuery

NewInitiatorFilterQuery returns a new InitiatorSubscriber with initialized filter.

func TopicFiltersForRunLog

func TopicFiltersForRunLog(jobID string) [][]common.Hash

TopicFiltersForRunLog generates the two variations of RunLog IDs that could possibly be entered. There is the ID, hex encoded and the ID zero padded.

func ValidateAdapter

func ValidateAdapter(bt *models.BridgeType, store *store.Store) (err error)

ValidateAdapter checks that the bridge type doesn't have a duplicate or invalid name

func ValidateInitiator

func ValidateInitiator(i models.Initiator, j models.JobSpec) error

ValidateInitiator checks the Initiator for any application logic errors.

func ValidateJob

func ValidateJob(j models.JobSpec, store *store.Store) error

ValidateJob checks the job and its associated Initiators and Tasks for any application logic errors.

func ValidateServiceAgreement

func ValidateServiceAgreement(sa models.ServiceAgreement, store *store.Store) error

ValidateServiceAgreement checks the ServiceAgreement for any application logic errors.

Types

type Afterer

type Afterer interface {
	After(d time.Duration) <-chan time.Time
}

Afterer is an interface that fulfills the After method, following the behavior of time.After.

type Application

type Application interface {
	Start() error
	Stop() error
	GetStore() *store.Store
}

Application implements the common functions used in the core node.

func NewApplication

func NewApplication(config store.Config) Application

NewApplication initializes a new store if one is not already present at the configured root directory (default: ~/.chainlink), the logger at the same directory and returns the Application to be used by the node.

type ChainlinkApplication

type ChainlinkApplication struct {
	Exiter        func(int)
	HeadTracker   *HeadTracker
	JobRunner     JobRunner
	JobSubscriber JobSubscriber
	Scheduler     *Scheduler
	Store         *store.Store
	Reaper        Reaper
	// contains filtered or unexported fields
}

ChainlinkApplication contains fields for the JobSubscriber, Scheduler, and Store. The JobSubscriber and Scheduler are also available in the services package, but the Store has its own package.

func (*ChainlinkApplication) AddAdapter

func (app *ChainlinkApplication) AddAdapter(bt *models.BridgeType) error

AddAdapter adds an adapter to the store. If another adapter with the same name already exists the adapter will not be added.

func (*ChainlinkApplication) AddJob

func (app *ChainlinkApplication) AddJob(job models.JobSpec) error

AddJob adds a job to the store and the scheduler. If there was an error from adding the job to the store, the job will not be added to the scheduler.

func (*ChainlinkApplication) GetStore

func (app *ChainlinkApplication) GetStore() *store.Store

GetStore returns the pointer to the store for the ChainlinkApplication.

func (*ChainlinkApplication) RemoveAdapter

func (app *ChainlinkApplication) RemoveAdapter(bt *models.BridgeType) error

RemoveAdapter removes an adapter from the store.

func (*ChainlinkApplication) Start

func (app *ChainlinkApplication) Start() error

Start runs the JobSubscriber and Scheduler. If successful, nil will be returned. Also listens for interrupt signals from the operating system so that the application can be properly closed before the application exits.

func (*ChainlinkApplication) Stop

func (app *ChainlinkApplication) Stop() error

Stop allows the application to exit by halting schedules, closing logs, and closing the DB connection.

type Cron

type Cron interface {
	Start()
	Stop()
	AddFunc(string, func()) error
}

Cron is an interface for scheduling recurring functions to run. Cron's schedule format is similar to the standard cron format but with an extra field at the beginning for seconds.

type HeadTrackable

type HeadTrackable interface {
	Connect(*models.IndexableBlockNumber) error
	Disconnect()
	OnNewHead(*models.BlockHeader)
}

HeadTrackable represents any object that wishes to respond to ethereum events, after being attached to HeadTracker.

type HeadTracker

type HeadTracker struct {
	// contains filtered or unexported fields
}

HeadTracker holds and stores the latest block number experienced by this particular node in a thread safe manner. Reconstitutes the last block number from the data store on reboot.

func NewHeadTracker

func NewHeadTracker(store *store.Store, sleepers ...utils.Sleeper) *HeadTracker

NewHeadTracker instantiates a new HeadTracker using the orm to persist new block numbers. Can be passed in an optional sleeper object that will dictate how often it tries to reconnect.

func (*HeadTracker) Attach

func (ht *HeadTracker) Attach(t HeadTrackable) string

Attach registers an object that will have HeadTrackable events fired on occurence, such as Connect.

func (*HeadTracker) Detach

func (ht *HeadTracker) Detach(id string)

Detach deregisters an object from having HeadTrackable events fired.

func (*HeadTracker) IsConnected

func (ht *HeadTracker) IsConnected() bool

IsConnected returns whether or not this HeadTracker is connected.

func (*HeadTracker) LastRecord

func (ht *HeadTracker) LastRecord() *models.IndexableBlockNumber

LastRecord returns the latest block header being tracked, or nil.

func (*HeadTracker) Save

Save updates the latest block number, if indeed the latest, and persists this number in case of reboot. Thread safe.

func (*HeadTracker) Start

func (ht *HeadTracker) Start() error

Start retrieves the last persisted block number from the HeadTracker, subscribes to new heads, and if successful fires Connect on the HeadTrackable argument.

func (*HeadTracker) Stop

func (ht *HeadTracker) Stop() error

Stop unsubscribes all connections and fires Disconnect.

type InitiatorSubscription

type InitiatorSubscription struct {
	*ManagedSubscription
	Job       models.JobSpec
	Initiator models.Initiator
	// contains filtered or unexported fields
}

InitiatorSubscription encapsulates all functionality needed to wrap an ethereum subscription for use with a Chainlink Initiator. Initiator specific functionality is delegated to the callback.

func NewInitiatorSubscription

func NewInitiatorSubscription(
	initr models.Initiator,
	job models.JobSpec,
	store *store.Store,
	filter ethereum.FilterQuery,
	callback func(InitiatorSubscriptionLogEvent),
) (InitiatorSubscription, error)

NewInitiatorSubscription creates a new InitiatorSubscription that feeds received logs to the callback func parameter.

type InitiatorSubscriptionLogEvent

type InitiatorSubscriptionLogEvent struct {
	Log       types.Log
	Job       models.JobSpec
	Initiator models.Initiator
	// contains filtered or unexported fields
}

InitiatorSubscriptionLogEvent encapsulates all information as a result of a received log from an InitiatorSubscription.

func (InitiatorSubscriptionLogEvent) ContractPayment

func (le InitiatorSubscriptionLogEvent) ContractPayment() (*assets.Link, error)

ContractPayment returns the amount attached to a contract to pay the Oracle upon fulfillment.

func (InitiatorSubscriptionLogEvent) EthLogJSON

func (le InitiatorSubscriptionLogEvent) EthLogJSON() (models.JSON, error)

EthLogJSON reformats the log as JSON.

func (InitiatorSubscriptionLogEvent) ForLogger

func (le InitiatorSubscriptionLogEvent) ForLogger(kvs ...interface{}) []interface{}

ForLogger formats the InitiatorSubscriptionLogEvent for easy common formatting in logs (trace statements, not ethereum events).

func (InitiatorSubscriptionLogEvent) RunLogJSON

func (le InitiatorSubscriptionLogEvent) RunLogJSON() (models.JSON, error)

RunLogJSON extracts data from the log's topics and data specific to the format defined by RunLogs.

func (InitiatorSubscriptionLogEvent) ToDebug

func (le InitiatorSubscriptionLogEvent) ToDebug()

ToDebug prints this event via logger.Debug.

func (InitiatorSubscriptionLogEvent) ToIndexableBlockNumber

func (le InitiatorSubscriptionLogEvent) ToIndexableBlockNumber() *models.IndexableBlockNumber

ToIndexableBlockNumber returns an IndexableBlockNumber for the given InitiatorSubscriptionLogEvent Block

func (InitiatorSubscriptionLogEvent) ValidateRunLog

func (le InitiatorSubscriptionLogEvent) ValidateRunLog() bool

ValidateRunLog returns whether or not the contained log is a RunLog, a specific Chainlink event trigger from smart contracts.

type JobRunner

type JobRunner interface {
	Start() error
	Stop()
	// contains filtered or unexported methods
}

JobRunner safely handles coordinating job runs.

func NewJobRunner

func NewJobRunner(str *store.Store) JobRunner

NewJobRunner initializes a JobRunner.

type JobRunnerError

type JobRunnerError struct {
	// contains filtered or unexported fields
}

JobRunnerError contains the field for the error message.

func (JobRunnerError) Error

func (err JobRunnerError) Error() string

Error returns the error message for the run.

type JobSubscriber

type JobSubscriber interface {
	HeadTrackable
	AddJob(job models.JobSpec, bn *models.IndexableBlockNumber) error
	Jobs() []models.JobSpec
}

JobSubscriber listens for push notifications from the ethereum node's websocket for specific jobs.

func NewJobSubscriber

func NewJobSubscriber(store *store.Store) JobSubscriber

NewJobSubscriber returns a new job subscriber.

type JobSubscription

type JobSubscription struct {
	Job models.JobSpec
	// contains filtered or unexported fields
}

JobSubscription listens to event logs being pushed from the Ethereum Node to a job.

func StartJobSubscription

func StartJobSubscription(job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (JobSubscription, error)

StartJobSubscription is the constructor of JobSubscription that to starts listening to and keeps track of event logs corresponding to a job.

func (JobSubscription) Unsubscribe

func (js JobSubscription) Unsubscribe()

Unsubscribe stops the subscription and cleans up associated resources.

type ManagedSubscription

type ManagedSubscription struct {
	// contains filtered or unexported fields
}

ManagedSubscription encapsulates the connecting, backfilling, and clean up of an ethereum node subscription.

func NewManagedSubscription

func NewManagedSubscription(
	store *store.Store,
	filter ethereum.FilterQuery,
	callback func(types.Log),
) (*ManagedSubscription, error)

NewManagedSubscription subscribes to the ethereum node with the passed filter and delegates incoming logs to callback.

func (ManagedSubscription) Unsubscribe

func (sub ManagedSubscription) Unsubscribe()

Unsubscribe closes channels and cleans up resources.

type Nower

type Nower interface {
	Now() time.Time
}

Nower is an interface that fulfills the Now method, following the behavior of time.Now.

type OneTime

type OneTime struct {
	Store *store.Store
	Clock Afterer
	// contains filtered or unexported fields
}

OneTime represents runs that are to be executed only once.

func (*OneTime) AddJob

func (ot *OneTime) AddJob(job models.JobSpec)

AddJob runs the job at the time specified for the "runat" initiator.

func (*OneTime) RunJobAt

func (ot *OneTime) RunJobAt(initr models.Initiator, job models.JobSpec)

RunJobAt wait until the Stop() function has been called on the run or the specified time for the run is after the present time.

func (*OneTime) Start

func (ot *OneTime) Start() error

Start allocates a channel for the "done" field with an empty struct.

func (*OneTime) Stop

func (ot *OneTime) Stop()

Stop closes the "done" field's channel.

type Reaper

type Reaper interface {
	Start() error
	Stop() error
	ReapSessions()
}

Reaper interface defines the methods used to reap stale objects such as sessions.

func NewStoreReaper

func NewStoreReaper(store *store.Store) Reaper

NewStoreReaper creates a reaper that cleans stale objects from the store.

type Recurring

type Recurring struct {
	Cron  Cron
	Clock Nower
	// contains filtered or unexported fields
}

Recurring is used for runs that need to execute on a schedule, and is configured with cron. Instances of Recurring must be initialized using NewRecurring().

func NewRecurring

func NewRecurring(store *store.Store) *Recurring

NewRecurring create a new instance of Recurring, ready to use.

func (*Recurring) AddJob

func (r *Recurring) AddJob(job models.JobSpec)

AddJob looks for "cron" initiators, adds them to cron's schedule for execution when specified.

func (*Recurring) Start

func (r *Recurring) Start() error

Start for Recurring types executes tasks with a "cron" initiator based on the configured schedule for the run.

func (*Recurring) Stop

func (r *Recurring) Stop()

Stop stops the cron scheduler and waits for running jobs to finish.

type Scheduler

type Scheduler struct {
	Recurring *Recurring
	OneTime   *OneTime
	// contains filtered or unexported fields
}

Scheduler contains fields for Recurring and OneTime for occurrences, a pointer to the store and a started field to indicate if the Scheduler has started or not.

func NewScheduler

func NewScheduler(store *store.Store) *Scheduler

NewScheduler initializes the Scheduler instances with both Recurring and OneTime fields since jobs can contain tasks which utilize both.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(job models.JobSpec)

AddJob is the governing function for Recurring and OneTime, and will only execute if the Scheduler has not already started.

func (*Scheduler) Start

func (s *Scheduler) Start() error

Start checks to ensure the Scheduler has not already started, calls the Start function for both Recurring and OneTime types, sets the started field to true, and adds jobs relevant to its initiator ("cron" and "runat").

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop is the governing function for both Recurring and OneTime Stop function. Sets the started field to false.

type Unsubscriber

type Unsubscriber interface {
	Unsubscribe()
}

Unsubscriber is the interface for all subscriptions, allowing one to unsubscribe.

func StartEthLogSubscription

func StartEthLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)

StartEthLogSubscription starts an InitiatorSubscription tailored for use with EthLogs.

func StartRunLogSubscription

func StartRunLogSubscription(initr models.Initiator, job models.JobSpec, head *models.IndexableBlockNumber, store *store.Store) (Unsubscriber, error)

StartRunLogSubscription starts an InitiatorSubscription tailored for use with RunLogs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL