job

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2021 License: ISC Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoSuchPeerID             = errors.New("no such peer id exists")
	ErrNoSuchKeyBundle          = errors.New("no such key bundle exists")
	ErrNoSuchTransmitterAddress = errors.New("no such transmitter address exists")
)
View Source
var (
	ErrViolatesForeignKeyConstraint = errors.New("violates foreign key constraint")
)

Functions

func GetORMAdvisoryLockClassID

func GetORMAdvisoryLockClassID(oi ORM) int32

func GetORMClaimedJobIDs

func GetORMClaimedJobIDs(oi ORM) (ids []int32)

func NewORM

func NewORM(db *gorm.DB, config *storm.Config, pipelineORM pipeline.ORM, eventBroadcaster postgres.EventBroadcaster, advisoryLocker postgres.AdvisoryLocker) *orm

func NewSpawner

func NewSpawner(orm ORM, config Config, jobTypeDelegates map[Type]Delegate) *spawner

func SetORMClaimedJobs

func SetORMClaimedJobs(oi ORM, jobs []SpecDB)

Types

type Config

type Config interface {
	DatabaseMaximumTxDuration() time.Duration
	DatabaseURL() string
	TriggerFallbackDBPollInterval() time.Duration
	JobPipelineParallelism() uint8
}

type Delegate

type Delegate interface {
	JobType() Type
	ServicesForSpec(spec SpecDB) ([]Service, error)
}

TODO(spook): I can't wait for Go generics

type DirectRequestSpec

type DirectRequestSpec struct {
	IDEmbed
	ContractAddress models.EIP55Address `json:"contractAddress" toml:"contractAddress"`
	// OnChainJobSpecID is the sha256 of the TOML that created this job spec
	OnChainJobSpecID gethCommon.Hash
	CreatedAt        time.Time `json:"createdAt" toml:"-"`
	UpdatedAt        time.Time `json:"updatedAt" toml:"-"`
}

func (DirectRequestSpec) TableName

func (DirectRequestSpec) TableName() string

type FluxMonitorSpec

type FluxMonitorSpec struct {
	IDEmbed
	ContractAddress models.EIP55Address `json:"contractAddress" toml:"contractAddress"`
	Precision       int32               `json:"precision,omitempty" gorm:"type:smallint"`
	Threshold       float32             `json:"threshold,omitempty"`
	// AbsoluteThreshold is the maximum absolute change allowed in a fluxmonitored
	// value before a new round should be kicked off, so that the current value
	// can be reported on-chain.
	AbsoluteThreshold float32       `json:"absoluteThreshold" gorm:"type:float;not null"`
	PollTimerPeriod   time.Duration `json:"pollTimerPeriod,omitempty" gorm:"type:jsonb"`
	PollTimerDisabled bool          `json:"pollTimerDisabled,omitempty" gorm:"type:jsonb"`
	IdleTimerPeriod   time.Duration `json:"idleTimerPeriod,omitempty" gorm:"type:jsonb"`
	IdleTimerDisabled bool          `json:"idleTimerDisabled,omitempty" gorm:"type:jsonb"`
	CreatedAt         time.Time     `json:"createdAt" toml:"-"`
	UpdatedAt         time.Time     `json:"updatedAt" toml:"-"`
}

type IDEmbed

type IDEmbed struct {
	ID int32 `json:"-" toml:"-"                 gorm:"primary_key"`
}

func (IDEmbed) GetID

func (id IDEmbed) GetID() string

func (*IDEmbed) SetID

func (id *IDEmbed) SetID(value string) error

type ORM

type ORM interface {
	ListenForNewJobs() (postgres.Subscription, error)
	ListenForDeletedJobs() (postgres.Subscription, error)
	ClaimUnclaimedJobs(ctx context.Context) ([]SpecDB, error)
	CreateJob(ctx context.Context, jobSpec *SpecDB, taskDAG pipeline.TaskDAG) error
	JobsV2() ([]SpecDB, error)
	FindJob(id int32) (SpecDB, error)
	DeleteJob(ctx context.Context, id int32) error
	RecordError(ctx context.Context, jobID int32, description string)
	UnclaimJob(ctx context.Context, id int32) error
	CheckForDeletedJobs(ctx context.Context) (deletedJobIDs []int32, err error)
	Close() error
	PipelineRunsByJobID(jobID int32, offset, size int) ([]pipeline.Run, int, error)
}

type OffchainReportingOracleSpec

type OffchainReportingOracleSpec struct {
	IDEmbed
	ContractAddress                        models.EIP55Address  `json:"contractAddress" toml:"contractAddress"`
	P2PPeerID                              *models.PeerID       `json:"p2pPeerID" toml:"p2pPeerID" gorm:"column:p2p_peer_id;default:null"`
	P2PBootstrapPeers                      pq.StringArray       `json:"p2pBootstrapPeers" toml:"p2pBootstrapPeers" gorm:"column:p2p_bootstrap_peers;type:text[]"`
	IsBootstrapPeer                        bool                 `json:"isBootstrapPeer" toml:"isBootstrapPeer"`
	EncryptedOCRKeyBundleID                *models.Sha256Hash   `json:"keyBundleID" toml:"keyBundleID"                 gorm:"type:bytea"`
	TransmitterAddress                     *models.EIP55Address `json:"transmitterAddress" toml:"transmitterAddress"`
	ObservationTimeout                     models.Interval      `json:"observationTimeout" toml:"observationTimeout" gorm:"type:bigint;default:null"`
	BlockchainTimeout                      models.Interval      `json:"blockchainTimeout" toml:"blockchainTimeout" gorm:"type:bigint;default:null"`
	ContractConfigTrackerSubscribeInterval models.Interval      `json:"contractConfigTrackerSubscribeInterval" toml:"contractConfigTrackerSubscribeInterval" gorm:"default:null"`
	ContractConfigTrackerPollInterval      models.Interval      `json:"contractConfigTrackerPollInterval" toml:"contractConfigTrackerPollInterval" gorm:"type:bigint;default:null"`
	ContractConfigConfirmations            uint16               `json:"contractConfigConfirmations" toml:"contractConfigConfirmations"`
	CreatedAt                              time.Time            `json:"createdAt" toml:"-"`
	UpdatedAt                              time.Time            `json:"updatedAt" toml:"-"`
}

TODO: remove pointers when upgrading to gormv2 which has https://github.com/go-gorm/gorm/issues/2748 fixed.

func (*OffchainReportingOracleSpec) BeforeCreate

func (s *OffchainReportingOracleSpec) BeforeCreate(db *gorm.DB) error

func (*OffchainReportingOracleSpec) BeforeSave

func (s *OffchainReportingOracleSpec) BeforeSave(db *gorm.DB) error

func (OffchainReportingOracleSpec) GetID

func (*OffchainReportingOracleSpec) SetID

func (s *OffchainReportingOracleSpec) SetID(value string) error

func (OffchainReportingOracleSpec) TableName

func (OffchainReportingOracleSpec) TableName() string

type PipelineRun

type PipelineRun struct {
	ID int64 `json:"-" gorm:"primary_key"`
}

func (PipelineRun) GetID

func (pr PipelineRun) GetID() string

func (*PipelineRun) SetID

func (pr *PipelineRun) SetID(value string) error

type Service

type Service interface {
	Start() error
	Close() error
}

type Spawner

type Spawner interface {
	Start() error
	Close() error
	CreateJob(ctx context.Context, spec SpecDB, name null.String) (int32, error)
	DeleteJob(ctx context.Context, jobID int32) error
}

The job spawner manages the spinning up and spinning down of the long-running services that perform the work described by job specs. Each active job spec has 1 or more of these services associated with it.

At present, Flux Monitor and Offchain Reporting jobs can only have a single "initiator", meaning that they only require a single service. But the older "direct request" model allows for multiple initiators, which imply multiple services.

type SpecDB

type SpecDB struct {
	IDEmbed
	OffchainreportingOracleSpecID *int32                       `json:"-"`
	OffchainreportingOracleSpec   *OffchainReportingOracleSpec `json:"offChainReportingOracleSpec"`
	DirectRequestSpecID           *int32                       `json:"-"`
	DirectRequestSpec             *DirectRequestSpec           `json:"DirectRequestSpec"`
	FluxMonitorSpecID             *int32                       `json:"-"`
	FluxMonitorSpec               *FluxMonitorSpec             `json:"fluxMonitorSpec"`
	PipelineSpecID                int32                        `json:"-"`
	PipelineSpec                  *pipeline.Spec               `json:"pipelineSpec"`
	JobSpecErrors                 []SpecError                  `json:"errors" gorm:"foreignKey:JobID"`
	Type                          Type                         `json:"type"`
	SchemaVersion                 uint32                       `json:"schemaVersion"`
	Name                          null.String                  `json:"name"`
	MaxTaskDuration               models.Interval              `json:"maxTaskDuration"`
	Pipeline                      pipeline.TaskDAG             `json:"-" toml:"observationSource" gorm:"-"`
}

func GetORMClaimedJobs

func GetORMClaimedJobs(oi ORM) (claimedJobs []SpecDB)

func (SpecDB) TableName

func (SpecDB) TableName() string

type SpecError

type SpecError struct {
	ID          int64     `json:"id" gorm:"primary_key"`
	JobID       int32     `json:"-"`
	Description string    `json:"description"`
	Occurrences uint      `json:"occurrences"`
	CreatedAt   time.Time `json:"createdAt"`
	UpdatedAt   time.Time `json:"updatedAt"`
}

func (SpecError) TableName

func (SpecError) TableName() string

type Type

type Type string
const (
	DirectRequest     Type = "directrequest"
	FluxMonitor       Type = "fluxmonitor"
	OffchainReporting Type = "offchainreporting"
)

func (Type) String

func (t Type) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL