jobs

package
v1.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrorHandlerReRun   = "rerun"
	ErrorHandlerReQueue = "requeue"
	ErrorHandlerLog     = "log"
)
View Source
const (
	TriggerTypeCron     = "cron"
	TriggerTypeOnChange = "onchange"
	JobTypeFull         = "fullsync"
	JobTypeIncremental  = "incremental"
)
View Source
const HelperJavascriptFunctions = `` /* 3000-byte string literal not displayed */

these are upper cased to prevent the user from accidentally redefining them (i mean, not really, but maybe it will help)

Variables

View Source
var MaxItemsExceededError = errors.New("errorHandler: max items reached")

Functions

func NewRaffle

func NewRaffle(
	ticketsFull int,
	ticketsIncr int,
	logger *zap.SugaredLogger,
	statsdClient statsd.ClientInterface,
) *raffle

Types

type EgdmNamespaceManagerShim added in v1.8.6

type EgdmNamespaceManagerShim struct {
	// contains filtered or unexported fields
}

func (EgdmNamespaceManagerShim) AsContext added in v1.8.6

func (e EgdmNamespaceManagerShim) AsContext() *egdm.Context

func (EgdmNamespaceManagerShim) AssertPrefixedIdentifierFromURI added in v1.8.6

func (e EgdmNamespaceManagerShim) AssertPrefixedIdentifierFromURI(value string) (string, error)

func (EgdmNamespaceManagerShim) DoesExpansionExistForPrefix added in v1.11.0

func (e EgdmNamespaceManagerShim) DoesExpansionExistForPrefix(prefix string) bool

func (EgdmNamespaceManagerShim) GetFullURI added in v1.8.6

func (e EgdmNamespaceManagerShim) GetFullURI(value string) (string, error)

func (EgdmNamespaceManagerShim) GetNamespaceExpansionForPrefix added in v1.8.6

func (e EgdmNamespaceManagerShim) GetNamespaceExpansionForPrefix(prefix string) (string, error)

func (EgdmNamespaceManagerShim) GetNamespaceMappings added in v1.8.6

func (e EgdmNamespaceManagerShim) GetNamespaceMappings() map[string]string

func (EgdmNamespaceManagerShim) GetPrefixForExpansion added in v1.8.6

func (e EgdmNamespaceManagerShim) GetPrefixForExpansion(expansion string) (string, error)

func (EgdmNamespaceManagerShim) GetPrefixedIdentifier added in v1.8.6

func (e EgdmNamespaceManagerShim) GetPrefixedIdentifier(value string) (string, error)

func (EgdmNamespaceManagerShim) IsFullUri added in v1.8.6

func (e EgdmNamespaceManagerShim) IsFullUri(value string) bool

func (EgdmNamespaceManagerShim) StorePrefixExpansionMapping added in v1.8.6

func (e EgdmNamespaceManagerShim) StorePrefixExpansionMapping(prefix string, expansion string)

type ErrorHandler

type ErrorHandler struct {
	Type       string `json:"errorHandler"` // rerun, requeue, log
	MaxRetries int    `json:"maxRetries"`   // default 1
	RetryDelay int64  `json:"retryDelay"`   // seconds, default 30
	MaxItems   int    `json:"maxItems"`     // 0 = all
	// contains filtered or unexported fields
}

type ErrorHandlers

type ErrorHandlers []*ErrorHandler

type FullSyncPipeline

type FullSyncPipeline struct{ PipelineSpec }

type HTTPTransform

type HTTPTransform struct {
	URL              string
	Authentication   string                   // "none, basic, token"
	User             string                   // for use in basic auth
	Password         string                   // for use in basic auth
	TokenProvider    string                   // for use in token auth
	TimeOut          float64                  // set timeout for http-transform
	SupportContext   bool                     // indicates if this transform supports context
	NamespaceManager *server.NamespaceManager // the store
}

func (*HTTPTransform) EndStoreContext added in v1.11.0

func (httpTransform *HTTPTransform) EndStoreContext(string) error

func (*HTTPTransform) GetConfig

func (httpTransform *HTTPTransform) GetConfig() map[string]interface{}

type IncrementalPipeline

type IncrementalPipeline struct{ PipelineSpec }

type JavascriptTransform

type JavascriptTransform struct {
	Store   *server.Store
	Code    []byte
	Runtime *goja.Runtime
	Logger  *zap.SugaredLogger

	Parallelism       int
	QueryResultWriter QueryResultWriter
	DatasetManager    *server.DsManager
	// contains filtered or unexported fields
}

func NewJavascriptTransform

func NewJavascriptTransform(
	log *zap.SugaredLogger,
	code64 string,
	store *server.Store,
	dsm *server.DsManager,
) (*JavascriptTransform, error)

func (*JavascriptTransform) AsEntity

func (javascriptTransform *JavascriptTransform) AsEntity(val interface{}) (res *server.Entity)

func (*JavascriptTransform) AssertNamespacePrefix

func (javascriptTransform *JavascriptTransform) AssertNamespacePrefix(urlExpansion string) string

func (*JavascriptTransform) BuildEntities added in v1.9.0

func (javascriptTransform *JavascriptTransform) BuildEntities(params map[string]any, since string, limit int, emit func(entity *server.Entity) error) (string, error)

func (*JavascriptTransform) ByID

func (javascriptTransform *JavascriptTransform) ByID(entityID string, datasets []string) *server.Entity

func (*JavascriptTransform) Clone

func (javascriptTransform *JavascriptTransform) Clone() (*JavascriptTransform, error)

Clone the transform for use in parallel processing

func (*JavascriptTransform) DatasetChanges

func (javascriptTransform *JavascriptTransform) DatasetChanges(
	datasetName string,
	since uint64,
	limit int,
) (*server.Changes, error)

func (*JavascriptTransform) EndStoreContext added in v1.11.0

func (javascriptTransform *JavascriptTransform) EndStoreContext(id string) error

func (*JavascriptTransform) ExecuteQuery

func (javascriptTransform *JavascriptTransform) ExecuteQuery(resultWriter QueryResultWriter) (er error)

func (*JavascriptTransform) ExecuteTransaction

func (javascriptTransform *JavascriptTransform) ExecuteTransaction(txn *server.Transaction) error

func (*JavascriptTransform) GetConfig

func (javascriptTransform *JavascriptTransform) GetConfig() map[string]interface{}

func (*JavascriptTransform) GetNamespacePrefix

func (javascriptTransform *JavascriptTransform) GetNamespacePrefix(urlExpansion string) string

func (*JavascriptTransform) Log

func (javascriptTransform *JavascriptTransform) Log(thing interface{}, logLevel string)

func (*JavascriptTransform) MakeEntityArray

func (javascriptTransform *JavascriptTransform) MakeEntityArray(entities []interface{}) []*server.Entity

func (*JavascriptTransform) NewEntity

func (javascriptTransform *JavascriptTransform) NewEntity() *server.Entity

func (*JavascriptTransform) NewTransaction

func (javascriptTransform *JavascriptTransform) NewTransaction() *server.Transaction

func (*JavascriptTransform) PagedQuery

func (javascriptTransform *JavascriptTransform) PagedQuery(
	query PagedQueryParams,
	pageSize int,
	forEach func(result []server.RelatedEntityResult) bool,
) []*server.RelatedFrom

func (*JavascriptTransform) Query

func (javascriptTransform *JavascriptTransform) Query(
	startingEntities []string,
	predicate string,
	inverse bool,
	datasets []string,
) [][]interface{}

func (*JavascriptTransform) Timing

func (javascriptTransform *JavascriptTransform) Timing(name string, end bool)

func (*JavascriptTransform) ToString

func (javascriptTransform *JavascriptTransform) ToString(obj interface{}) string

func (*JavascriptTransform) UUID

func (javascriptTransform *JavascriptTransform) UUID() string

func (*JavascriptTransform) WriteQueryResult

func (javascriptTransform *JavascriptTransform) WriteQueryResult(object any) error

type JobConfiguration

type JobConfiguration struct {
	ID          string                 `json:"id"`
	Title       string                 `json:"title"`
	Description string                 `json:"description"`
	Tags        []string               `json:"tags"`
	Source      map[string]interface{} `json:"source"`
	Sink        map[string]interface{} `json:"sink"`
	Transform   map[string]interface{} `json:"transform"`
	Triggers    []JobTrigger           `json:"triggers"`
	Paused      bool                   `json:"paused"`
	BatchSize   int                    `json:"batchSize"`
}

JobConfiguration is the external interfacing object to configure a job. It is also the one that gets persisted in the store.

type JobStatus

type JobStatus struct {
	JobID    string    `json:"jobId"`
	JobTitle string    `json:"jobTitle"`
	Started  time.Time `json:"started"`
}

type JobTrigger

type JobTrigger struct {
	TriggerType      string        `json:"triggerType"`
	JobType          string        `json:"jobType"`
	Schedule         string        `json:"schedule"`
	MonitoredDataset string        `json:"monitoredDataset"`
	ErrorHandlers    ErrorHandlers `json:"onError"`
}

type LogFailingEntityHandler

type LogFailingEntityHandler struct {
	MaxItems int
	// contains filtered or unexported fields
}

type PagedQueryParams

type PagedQueryParams struct {
	StartURIs     []string
	Via           string
	Inverse       bool
	Datasets      []string
	Continuations []*server.RelatedFrom
}

type Pipeline

type Pipeline interface {
	// contains filtered or unexported methods
}

type PipelineSpec

type PipelineSpec struct {
	// contains filtered or unexported fields
}

type QueryResultWriter

type QueryResultWriter interface {
	WriteObject(object any) error
}

type ReQueueFailingEntityHandler

type ReQueueFailingEntityHandler struct {
	MaxItems int
	// contains filtered or unexported fields
}

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

The Runner is used to organize and keep track of configured jobs. It is also responsible for running (duh) jobs. It will also pretty log everything it is doing. The Runner should only be interacted with from the Scheduler.

func NewRunner

func NewRunner(
	env *conf.Config,
	store *server.Store,
	tokenProviders *security.TokenProviders,
	eb server.EventBus,
	statsdClient statsd.ClientInterface,
) *Runner

NewRunner creates a new job runner. It should only be used from the main.go

func (*Runner) Stop

func (runner *Runner) Stop()

Stop calls the cron stop method to stop all future scheduled jobs. It will also go trough the list of running jobs and cancel them.

type ScheduleEntries

type ScheduleEntries struct {
	Entries []ScheduleEntry `json:"entries"`
}

type ScheduleEntry

type ScheduleEntry struct {
	ID       int       `json:"id"`
	JobID    string    `json:"jobId"`
	JobTitle string    `json:"jobTitle"`
	Next     time.Time `json:"next"`
	Prev     time.Time `json:"prev"`
}

type Scheduler

type Scheduler struct {
	Logger         *zap.SugaredLogger
	Store          *server.Store
	Runner         *Runner
	DatasetManager *server.DsManager
}

The Scheduler deals with reading and writing jobs and making sure they get added to the job Runner. It also deals with translating between the external JobConfiguration and the internal job format.

func NewScheduler

func NewScheduler(
	env *conf.Config,
	store *server.Store,
	dsm *server.DsManager,
	runner *Runner,
) *Scheduler

NewScheduler returns a new Scheduler. When started, it will load all existing JobConfiguration's from the store, and schedule this with the runner.

func (*Scheduler) AddJob

func (s *Scheduler) AddJob(jobConfig *JobConfiguration) error

AddJob takes an incoming JobConfiguration and stores it in the store Once it has stored it, it will transform it to a Pipeline and add it to the scheduler It is important that jobs are valid, so care is taken to validate the JobConfiguration before it can be scheduled.

func (*Scheduler) DeleteJob

func (s *Scheduler) DeleteJob(jobID string) error

DeleteJob deletes a JobConfiguration, and calls out to the Runner to make sure it also gets removed from the running jobs. It will attempt to load the job before it deletes it, to validate it's existence.

func (*Scheduler) GetJobHistory

func (s *Scheduler) GetJobHistory() []*jobResult

GetJobHistory returns a list of history for all jobs that have ever been run on the server. It could be that in the future this will only return the history of the currently registered jobs. Each job stores its Start and End time, together with the last error if any.

func (*Scheduler) GetJobState

func (s *Scheduler) GetJobState(id string) (*SyncJobState, error)

func (*Scheduler) GetRunningJob

func (s *Scheduler) GetRunningJob(jobid string) *JobStatus

GetRunningJob gets the status for a single running job. This can be used to see if a job is still running, and is currently used by the cli to follow a job run operation.

func (*Scheduler) GetRunningJobs

func (s *Scheduler) GetRunningJobs() []JobStatus

GetRunningJobs gets the status for all running jobs. It can be used to see what the job system is currently doing.

func (*Scheduler) GetScheduleEntries

func (s *Scheduler) GetScheduleEntries() ScheduleEntries

GetScheduleEntries returns a cron list of all scheduled entries currently scheduled. Paused jobs are not part of this list

func (*Scheduler) KillJob

func (s *Scheduler) KillJob(jobid string)

KillJob will stop a job stat is currently running. If the job is not running, it will do nothing. If the job that is running is RunOnce, then it will be deleted afterwards.

func (*Scheduler) ListJobs

func (s *Scheduler) ListJobs() []*JobConfiguration

ListJobs returns a list of all stored configurations

func (*Scheduler) LoadJob

func (s *Scheduler) LoadJob(jobID string) (*JobConfiguration, error)

LoadJob will attempt to load a JobConfiguration based on a jobId. Because of the GetObject method currently works, it will not return nil when not found, but an empty jobConfig object.

func (*Scheduler) MultiSourceCodeRegistration

func (s *Scheduler) MultiSourceCodeRegistration(code64 string, reg source.DependencyRegistry) error

func (*Scheduler) Parse

func (s *Scheduler) Parse(rawJSON []byte) (*JobConfiguration, error)

Parse is a convenience method to parse raw config json into a JobConfiguration

func (*Scheduler) PauseJob

func (s *Scheduler) PauseJob(jobid string) error

PauseJob pauses a job. It will not stop a running job, but it will prevent the job from running on the next schedule.

func (*Scheduler) ResetJob

func (s *Scheduler) ResetJob(jobid string, since string) error

ResetJob will reset the job since token. This allows the job to be rerun from the beginning

func (*Scheduler) RunJob

func (s *Scheduler) RunJob(jobid string, jobType string) (string, error)

RunJob runs an existing job, if not already running. It does so by adding a temp job to the scheduler, without saving it. The temp job is added with the RunOnce flag set to true

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context) error

func (*Scheduler) Stop

func (s *Scheduler) Stop(ctx context.Context) error

func (*Scheduler) UnpauseJob

func (s *Scheduler) UnpauseJob(jobid string) error

UnpauseJob resumes a paused job. It will not run a job, however it will add it to the scheduler so that it can be ran on next schedule.

type Sink

type Sink interface {
	GetConfig() map[string]interface{}
	// contains filtered or unexported methods
}

Interface defs Sink interface for where data can be pushed

type SyncJobState

type SyncJobState struct {
	ID                 string `json:"id"`
	ContinuationToken  string `json:"token"`
	LastRunCompletedOk bool   `json:"lastrunok"`
	LastRunError       string `json:"lastrunerror"`
}

SyncJobState used to capture the state of a running job

type Transform

type Transform interface {
	GetConfig() map[string]interface{}

	EndStoreContext(string) error
	// contains filtered or unexported methods
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL