Documentation ¶
Index ¶
- Constants
- func GetConnectionString() string
- type HandleT
- func (jd *HandleT) GetJobHealth() []JobHealthT
- func (jd *HandleT) GetProcessed(stateFilter []string, customValFilters []string, count int, ...) []*JobT
- func (jd *HandleT) GetToRetry(customValFilters []string, count int, sourceIDFilters ...string) []*JobT
- func (jd *HandleT) GetUnprocessed(customValFilters []string, count int, sourceIDFilters ...string) []*JobT
- func (jd *HandleT) Setup(clearAll bool, tablePrefix string, retentionPeriod time.Duration, ...)
- func (jd *HandleT) Store(jobList []*JobT) (map[uuid.UUID]string, bool)
- func (jd *HandleT) UpdateJobStatus(statusList []*JobStatusT, customValFilters []string)
- type JobHealthT
- type JobStatusT
- type JobT
- type OwnerType
Constants ¶
const ( SucceededState = "succeeded" FailedState = "failed" ExecutingState = "executing" AbortedState = "aborted" WaitingState = "waiting" WaitingRetryState = "waiting_retry" InternalState = "NP" )
constants for JobStatusT JobState
Variables ¶
This section is empty.
Functions ¶
func GetConnectionString ¶
func GetConnectionString() string
Types ¶
type HandleT ¶
type HandleT struct { MinDSRetentionPeriod time.Duration MaxDSRetentionPeriod time.Duration MaxDSSize *int // TriggerAddNewDS, TriggerMigrateDS is useful for triggering addNewDS to run from tests. // TODO: Ideally we should refactor the code to not use this override. TriggerAddNewDS func() <-chan time.Time TriggerMigrateDS func() <-chan time.Time TriggerRefreshDS func() <-chan time.Time // contains filtered or unexported fields }
func (*HandleT) GetJobHealth ¶ added in v1.1.0
func (jd *HandleT) GetJobHealth() []JobHealthT
UpdateJobStatus updates the status of a batch of jobs customValFilters[] is passed so we can efficiently mark empty cache Later we can move this to query
func (*HandleT) GetProcessed ¶
func (jd *HandleT) GetProcessed(stateFilter []string, customValFilters []string, count int, sourceIDFilters ...string) []*JobT
GetProcessed returns events of a given state. This does not update any state itself and GetProcessed returns events of a given state. This does not update any state itself and relises on the caller to update it. That means that successive calls to GetProcessed("failed") can return the same set of events. It is the responsibility of the caller to call it from one thread, update the state (to "waiting") in the same thread and pass on the the processors
func (*HandleT) GetToRetry ¶
func (*HandleT) GetUnprocessed ¶
func (*HandleT) Setup ¶
func (jd *HandleT) Setup(clearAll bool, tablePrefix string, retentionPeriod time.Duration, toBackup bool, softDeletion bool)
Setup is used to initialize the HandleT structure. clearAll = True means it will remove all existing tables tablePrefix must be unique and is used to separate multiple users of JobsDB dsRetentionPeriod = A DS is not deleted if it has some activity in the retention time
func (*HandleT) UpdateJobStatus ¶
func (jd *HandleT) UpdateJobStatus(statusList []*JobStatusT, customValFilters []string)
type JobHealthT ¶ added in v1.1.0
type JobHealthT struct { SourceName string `json:"source_name"` DestinationName string `json:"destination_name"` AttemptNum int `json:"attempt_num"` DestinationConfig json.RawMessage `json:"destination_config"` ErrorResponse json.RawMessage `json:"error_response"` Payload json.RawMessage `json:"payload"` ExecTime time.Time `json:"exec_time"` }
type JobStatusT ¶
type JobStatusT struct { JobID int64 `json:"JobID"` JobState string `json:"JobState"` // ENUM waiting, executing, succeeded, waiting_retry, failed, aborted, migrating, migrated, wont_migrate AttemptNum int `json:"AttemptNum"` ExecTime time.Time `json:"ExecTime"` RetryTime time.Time `json:"RetryTime"` ErrorCode string `json:"ErrorCode"` ErrorResponse json.RawMessage `json:"ErrorResponse"` Parameters json.RawMessage `json:"Parameters"` JobParameters json.RawMessage `json:"-"` WorkspaceId string `json:"WorkspaceId"` }
type JobT ¶
type JobT struct { UUID uuid.UUID `json:"UUID"` JobID int64 `json:"JobID"` UserID string `json:"UserID"` CreatedAt time.Time `json:"CreatedAt"` ExpireAt time.Time `json:"ExpireAt"` CustomVal string `json:"CustomVal"` EventCount int `json:"EventCount"` EventPayload json.RawMessage `json:"EventPayload"` PayloadSize int64 `json:"PayloadSize"` LastJobStatus JobStatusT `json:"LastJobStatus"` Parameters json.RawMessage `json:"Parameters"` WorkspaceId string `json:"WorkspaceId"` }