Documentation ¶
Index ¶
- Constants
- Variables
- func NewRaffle(ticketsFull int, ticketsIncr int, logger *zap.SugaredLogger, ...) *raffle
- type EgdmNamespaceManagerShim
- func (e EgdmNamespaceManagerShim) AsContext() *egdm.Context
- func (e EgdmNamespaceManagerShim) AssertPrefixedIdentifierFromURI(value string) (string, error)
- func (e EgdmNamespaceManagerShim) DoesExpansionExistForPrefix(prefix string) bool
- func (e EgdmNamespaceManagerShim) GetFullURI(value string) (string, error)
- func (e EgdmNamespaceManagerShim) GetNamespaceExpansionForPrefix(prefix string) (string, error)
- func (e EgdmNamespaceManagerShim) GetNamespaceMappings() map[string]string
- func (e EgdmNamespaceManagerShim) GetPrefixForExpansion(expansion string) (string, error)
- func (e EgdmNamespaceManagerShim) GetPrefixedIdentifier(value string) (string, error)
- func (e EgdmNamespaceManagerShim) IsFullUri(value string) bool
- func (e EgdmNamespaceManagerShim) StorePrefixExpansionMapping(prefix string, expansion string)
- type ErrorHandler
- type ErrorHandlers
- type FullSyncPipeline
- type HTTPTransform
- type IncrementalPipeline
- type JavascriptTransform
- func (javascriptTransform *JavascriptTransform) AsEntity(val interface{}) (res *server.Entity)
- func (javascriptTransform *JavascriptTransform) AssertNamespacePrefix(urlExpansion string) string
- func (javascriptTransform *JavascriptTransform) BuildEntities(params map[string]any, since string, limit int, ...) (string, error)
- func (javascriptTransform *JavascriptTransform) ByID(entityID string, datasets []string) *server.Entity
- func (javascriptTransform *JavascriptTransform) Clone() (*JavascriptTransform, error)
- func (javascriptTransform *JavascriptTransform) DatasetChanges(datasetName string, since uint64, limit int) (*server.Changes, error)
- func (javascriptTransform *JavascriptTransform) EndStoreContext(id string) error
- func (javascriptTransform *JavascriptTransform) ExecuteQuery(resultWriter QueryResultWriter) (er error)
- func (javascriptTransform *JavascriptTransform) ExecuteTransaction(txn *server.Transaction) error
- func (javascriptTransform *JavascriptTransform) GetConfig() map[string]interface{}
- func (javascriptTransform *JavascriptTransform) GetNamespacePrefix(urlExpansion string) string
- func (javascriptTransform *JavascriptTransform) Log(thing interface{}, logLevel string)
- func (javascriptTransform *JavascriptTransform) MakeEntityArray(entities []interface{}) []*server.Entity
- func (javascriptTransform *JavascriptTransform) NewEntity() *server.Entity
- func (javascriptTransform *JavascriptTransform) NewTransaction() *server.Transaction
- func (javascriptTransform *JavascriptTransform) PagedQuery(query PagedQueryParams, pageSize int, ...) []*server.RelatedFrom
- func (javascriptTransform *JavascriptTransform) Query(startingEntities []string, predicate string, inverse bool, datasets []string) [][]interface{}
- func (javascriptTransform *JavascriptTransform) Timing(name string, end bool)
- func (javascriptTransform *JavascriptTransform) ToString(obj interface{}) string
- func (javascriptTransform *JavascriptTransform) UUID() string
- func (javascriptTransform *JavascriptTransform) WriteQueryResult(object any) error
- type JobConfiguration
- type JobStatus
- type JobTrigger
- type LogFailingEntityHandler
- type PagedQueryParams
- type Pipeline
- type PipelineSpec
- type QueryResultWriter
- type ReQueueFailingEntityHandler
- type Runner
- type ScheduleEntries
- type ScheduleEntry
- type Scheduler
- func (s *Scheduler) AddJob(jobConfig *JobConfiguration) error
- func (s *Scheduler) DeleteJob(jobID string) error
- func (s *Scheduler) GetJobHistory() []*jobResult
- func (s *Scheduler) GetJobState(id string) (*SyncJobState, error)
- func (s *Scheduler) GetRunningJob(jobid string) *JobStatus
- func (s *Scheduler) GetRunningJobs() []JobStatus
- func (s *Scheduler) GetScheduleEntries() ScheduleEntries
- func (s *Scheduler) KillJob(jobid string)
- func (s *Scheduler) ListJobs() []*JobConfiguration
- func (s *Scheduler) LoadJob(jobID string) (*JobConfiguration, error)
- func (s *Scheduler) MultiSourceCodeRegistration(code64 string, reg source.DependencyRegistry) error
- func (s *Scheduler) Parse(rawJSON []byte) (*JobConfiguration, error)
- func (s *Scheduler) PauseJob(jobid string) error
- func (s *Scheduler) ResetJob(jobid string, since string) error
- func (s *Scheduler) RunJob(jobid string, jobType string) (string, error)
- func (s *Scheduler) Start(ctx context.Context) error
- func (s *Scheduler) Stop(ctx context.Context) error
- func (s *Scheduler) UnpauseJob(jobid string) error
- type Sink
- type SyncJobState
- type Transform
Constants ¶
const ( ErrorHandlerReRun = "rerun" ErrorHandlerReQueue = "requeue" ErrorHandlerLog = "log" )
const ( TriggerTypeCron = "cron" TriggerTypeOnChange = "onchange" JobTypeFull = "fullsync" JobTypeIncremental = "incremental" )
const HelperJavascriptFunctions = `` /* 3000-byte string literal not displayed */
these are upper cased to prevent the user from accidentally redefining them (i mean, not really, but maybe it will help)
Variables ¶
var ( TriggerTypes = map[string]bool{TriggerTypeOnChange: true, TriggerTypeCron: true} JobTypes = map[string]bool{JobTypeFull: true, JobTypeIncremental: true} )
var ErrorHandlerTypes = map[string]bool{ErrorHandlerReRun: true, ErrorHandlerReQueue: true, ErrorHandlerLog: true}
var MaxItemsExceededError = errors.New("errorHandler: max items reached")
Functions ¶
func NewRaffle ¶
func NewRaffle( ticketsFull int, ticketsIncr int, logger *zap.SugaredLogger, statsdClient statsd.ClientInterface, ) *raffle
Types ¶
type EgdmNamespaceManagerShim ¶ added in v1.8.6
type EgdmNamespaceManagerShim struct {
// contains filtered or unexported fields
}
func (EgdmNamespaceManagerShim) AsContext ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) AsContext() *egdm.Context
func (EgdmNamespaceManagerShim) AssertPrefixedIdentifierFromURI ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) AssertPrefixedIdentifierFromURI(value string) (string, error)
func (EgdmNamespaceManagerShim) DoesExpansionExistForPrefix ¶ added in v1.11.0
func (e EgdmNamespaceManagerShim) DoesExpansionExistForPrefix(prefix string) bool
func (EgdmNamespaceManagerShim) GetFullURI ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) GetFullURI(value string) (string, error)
func (EgdmNamespaceManagerShim) GetNamespaceExpansionForPrefix ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) GetNamespaceExpansionForPrefix(prefix string) (string, error)
func (EgdmNamespaceManagerShim) GetNamespaceMappings ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) GetNamespaceMappings() map[string]string
func (EgdmNamespaceManagerShim) GetPrefixForExpansion ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) GetPrefixForExpansion(expansion string) (string, error)
func (EgdmNamespaceManagerShim) GetPrefixedIdentifier ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) GetPrefixedIdentifier(value string) (string, error)
func (EgdmNamespaceManagerShim) IsFullUri ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) IsFullUri(value string) bool
func (EgdmNamespaceManagerShim) StorePrefixExpansionMapping ¶ added in v1.8.6
func (e EgdmNamespaceManagerShim) StorePrefixExpansionMapping(prefix string, expansion string)
type ErrorHandler ¶
type ErrorHandlers ¶
type ErrorHandlers []*ErrorHandler
type FullSyncPipeline ¶
type FullSyncPipeline struct{ PipelineSpec }
type HTTPTransform ¶
type HTTPTransform struct { URL string Authentication string // "none, basic, token" User string // for use in basic auth Password string // for use in basic auth TokenProvider string // for use in token auth TimeOut float64 // set timeout for http-transform SupportContext bool // indicates if this transform supports context NamespaceManager *server.NamespaceManager // the store }
func (*HTTPTransform) EndStoreContext ¶ added in v1.11.0
func (httpTransform *HTTPTransform) EndStoreContext(string) error
func (*HTTPTransform) GetConfig ¶
func (httpTransform *HTTPTransform) GetConfig() map[string]interface{}
type IncrementalPipeline ¶
type IncrementalPipeline struct{ PipelineSpec }
type JavascriptTransform ¶
type JavascriptTransform struct { Store *server.Store Code []byte Runtime *goja.Runtime Logger *zap.SugaredLogger Parallelism int QueryResultWriter QueryResultWriter DatasetManager *server.DsManager // contains filtered or unexported fields }
func NewJavascriptTransform ¶
func NewJavascriptTransform( log *zap.SugaredLogger, code64 string, store *server.Store, dsm *server.DsManager, ) (*JavascriptTransform, error)
func (*JavascriptTransform) AsEntity ¶
func (javascriptTransform *JavascriptTransform) AsEntity(val interface{}) (res *server.Entity)
func (*JavascriptTransform) AssertNamespacePrefix ¶
func (javascriptTransform *JavascriptTransform) AssertNamespacePrefix(urlExpansion string) string
func (*JavascriptTransform) BuildEntities ¶ added in v1.9.0
func (*JavascriptTransform) ByID ¶
func (javascriptTransform *JavascriptTransform) ByID(entityID string, datasets []string) *server.Entity
func (*JavascriptTransform) Clone ¶
func (javascriptTransform *JavascriptTransform) Clone() (*JavascriptTransform, error)
Clone the transform for use in parallel processing
func (*JavascriptTransform) DatasetChanges ¶
func (*JavascriptTransform) EndStoreContext ¶ added in v1.11.0
func (javascriptTransform *JavascriptTransform) EndStoreContext(id string) error
func (*JavascriptTransform) ExecuteQuery ¶
func (javascriptTransform *JavascriptTransform) ExecuteQuery(resultWriter QueryResultWriter) (er error)
func (*JavascriptTransform) ExecuteTransaction ¶
func (javascriptTransform *JavascriptTransform) ExecuteTransaction(txn *server.Transaction) error
func (*JavascriptTransform) GetConfig ¶
func (javascriptTransform *JavascriptTransform) GetConfig() map[string]interface{}
func (*JavascriptTransform) GetNamespacePrefix ¶
func (javascriptTransform *JavascriptTransform) GetNamespacePrefix(urlExpansion string) string
func (*JavascriptTransform) Log ¶
func (javascriptTransform *JavascriptTransform) Log(thing interface{}, logLevel string)
func (*JavascriptTransform) MakeEntityArray ¶
func (javascriptTransform *JavascriptTransform) MakeEntityArray(entities []interface{}) []*server.Entity
func (*JavascriptTransform) NewEntity ¶
func (javascriptTransform *JavascriptTransform) NewEntity() *server.Entity
func (*JavascriptTransform) NewTransaction ¶
func (javascriptTransform *JavascriptTransform) NewTransaction() *server.Transaction
func (*JavascriptTransform) PagedQuery ¶
func (javascriptTransform *JavascriptTransform) PagedQuery( query PagedQueryParams, pageSize int, forEach func(result []server.RelatedEntityResult) bool, ) []*server.RelatedFrom
func (*JavascriptTransform) Query ¶
func (javascriptTransform *JavascriptTransform) Query( startingEntities []string, predicate string, inverse bool, datasets []string, ) [][]interface{}
func (*JavascriptTransform) Timing ¶
func (javascriptTransform *JavascriptTransform) Timing(name string, end bool)
func (*JavascriptTransform) ToString ¶
func (javascriptTransform *JavascriptTransform) ToString(obj interface{}) string
func (*JavascriptTransform) UUID ¶
func (javascriptTransform *JavascriptTransform) UUID() string
func (*JavascriptTransform) WriteQueryResult ¶
func (javascriptTransform *JavascriptTransform) WriteQueryResult(object any) error
type JobConfiguration ¶
type JobConfiguration struct { ID string `json:"id"` Title string `json:"title"` Description string `json:"description"` Tags []string `json:"tags"` Source map[string]interface{} `json:"source"` Sink map[string]interface{} `json:"sink"` Transform map[string]interface{} `json:"transform"` Triggers []JobTrigger `json:"triggers"` Paused bool `json:"paused"` BatchSize int `json:"batchSize"` }
JobConfiguration is the external interfacing object to configure a job. It is also the one that gets persisted in the store.
type JobTrigger ¶
type JobTrigger struct { TriggerType string `json:"triggerType"` JobType string `json:"jobType"` Schedule string `json:"schedule"` MonitoredDataset string `json:"monitoredDataset"` ErrorHandlers ErrorHandlers `json:"onError"` }
type LogFailingEntityHandler ¶
type LogFailingEntityHandler struct { MaxItems int // contains filtered or unexported fields }
type PagedQueryParams ¶
type PipelineSpec ¶
type PipelineSpec struct {
// contains filtered or unexported fields
}
type QueryResultWriter ¶
type ReQueueFailingEntityHandler ¶
type ReQueueFailingEntityHandler struct { MaxItems int // contains filtered or unexported fields }
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
The Runner is used to organize and keep track of configured jobs. It is also responsible for running (duh) jobs. It will also pretty log everything it is doing. The Runner should only be interacted with from the Scheduler.
type ScheduleEntries ¶
type ScheduleEntries struct {
Entries []ScheduleEntry `json:"entries"`
}
type ScheduleEntry ¶
type Scheduler ¶
type Scheduler struct { Logger *zap.SugaredLogger Store *server.Store Runner *Runner DatasetManager *server.DsManager }
The Scheduler deals with reading and writing jobs and making sure they get added to the job Runner. It also deals with translating between the external JobConfiguration and the internal job format.
func NewScheduler ¶
func NewScheduler( env *conf.Config, store *server.Store, dsm *server.DsManager, runner *Runner, ) *Scheduler
NewScheduler returns a new Scheduler. When started, it will load all existing JobConfiguration's from the store, and schedule this with the runner.
func (*Scheduler) AddJob ¶
func (s *Scheduler) AddJob(jobConfig *JobConfiguration) error
AddJob takes an incoming JobConfiguration and stores it in the store Once it has stored it, it will transform it to a Pipeline and add it to the scheduler It is important that jobs are valid, so care is taken to validate the JobConfiguration before it can be scheduled.
func (*Scheduler) DeleteJob ¶
DeleteJob deletes a JobConfiguration, and calls out to the Runner to make sure it also gets removed from the running jobs. It will attempt to load the job before it deletes it, to validate it's existence.
func (*Scheduler) GetJobHistory ¶
func (s *Scheduler) GetJobHistory() []*jobResult
GetJobHistory returns a list of history for all jobs that have ever been run on the server. It could be that in the future this will only return the history of the currently registered jobs. Each job stores its Start and End time, together with the last error if any.
func (*Scheduler) GetJobState ¶
func (s *Scheduler) GetJobState(id string) (*SyncJobState, error)
func (*Scheduler) GetRunningJob ¶
GetRunningJob gets the status for a single running job. This can be used to see if a job is still running, and is currently used by the cli to follow a job run operation.
func (*Scheduler) GetRunningJobs ¶
GetRunningJobs gets the status for all running jobs. It can be used to see what the job system is currently doing.
func (*Scheduler) GetScheduleEntries ¶
func (s *Scheduler) GetScheduleEntries() ScheduleEntries
GetScheduleEntries returns a cron list of all scheduled entries currently scheduled. Paused jobs are not part of this list
func (*Scheduler) KillJob ¶
KillJob will stop a job stat is currently running. If the job is not running, it will do nothing. If the job that is running is RunOnce, then it will be deleted afterwards.
func (*Scheduler) ListJobs ¶
func (s *Scheduler) ListJobs() []*JobConfiguration
ListJobs returns a list of all stored configurations
func (*Scheduler) LoadJob ¶
func (s *Scheduler) LoadJob(jobID string) (*JobConfiguration, error)
LoadJob will attempt to load a JobConfiguration based on a jobId. Because of the GetObject method currently works, it will not return nil when not found, but an empty jobConfig object.
func (*Scheduler) MultiSourceCodeRegistration ¶
func (s *Scheduler) MultiSourceCodeRegistration(code64 string, reg source.DependencyRegistry) error
func (*Scheduler) Parse ¶
func (s *Scheduler) Parse(rawJSON []byte) (*JobConfiguration, error)
Parse is a convenience method to parse raw config json into a JobConfiguration
func (*Scheduler) PauseJob ¶
PauseJob pauses a job. It will not stop a running job, but it will prevent the job from running on the next schedule.
func (*Scheduler) ResetJob ¶
ResetJob will reset the job since token. This allows the job to be rerun from the beginning
func (*Scheduler) RunJob ¶
RunJob runs an existing job, if not already running. It does so by adding a temp job to the scheduler, without saving it. The temp job is added with the RunOnce flag set to true
func (*Scheduler) UnpauseJob ¶
UnpauseJob resumes a paused job. It will not run a job, however it will add it to the scheduler so that it can be ran on next schedule.
type Sink ¶
type Sink interface { GetConfig() map[string]interface{} // contains filtered or unexported methods }
Interface defs Sink interface for where data can be pushed