Version: v0.0.0-...-2f51daf Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2023 License: Apache-2.0 Imports: 13 Imported by: 580




View Source
const (
	// KindGeneric : Kind of generic job
	KindGeneric = "Generic"
	// KindScheduled : Kind of scheduled job
	KindScheduled = "Scheduled"
	// KindPeriodic : Kind of periodic job
	KindPeriodic = "Periodic"
View Source
const (
	// SampleJob is name of demo job
	SampleJob = "DEMO"

	// ImageScanJobVendorType is name of scan job it will be used as key to register to job service.
	ImageScanJobVendorType = "IMAGE_SCAN"
	// GarbageCollectionVendorType job name
	GarbageCollectionVendorType = "GARBAGE_COLLECTION"
	// ReplicationVendorType : the name of the replication job in job service
	ReplicationVendorType = "REPLICATION"
	// WebhookJobVendorType : the name of the webhook job in job service
	WebhookJobVendorType = "WEBHOOK"
	// SlackJobVendorType : the name of the slack job in job service
	SlackJobVendorType = "SLACK"
	// RetentionVendorType : the name of the retention job
	RetentionVendorType = "RETENTION"
	// P2PPreheatVendorType : the name of the P2P preheat job
	P2PPreheatVendorType = "P2P_PREHEAT"
	// PurgeAuditVendorType : the name of purge audit job
	PurgeAuditVendorType = "PURGE_AUDIT_LOG"
	// SystemArtifactCleanupVendorType : the name of the SystemArtifact cleanup job
	SystemArtifactCleanupVendorType = "SYSTEM_ARTIFACT_CLEANUP"
	// ScanDataExportVendorType : the name of the scan data export job
	ScanDataExportVendorType = "SCAN_DATA_EXPORT"
	// ExecSweepVendorType: the name of the execution sweep job
	ExecSweepVendorType = "EXECUTION_SWEEP"
	// ScanAllVendorType: the name of the scan all job
	ScanAllVendorType = "SCAN_ALL"


This section is empty.


func GetExecutionSweeperCount

func GetExecutionSweeperCount() map[string]int64

GetExecutionSweeperCount gets the count of execution records retained by the sweeper


type ACK

type ACK struct {
	Status    string `json:"status"`
	Revision  int64  `json:"revision"`
	CheckInAt int64  `json:"check_in_at"`

ACK is the acknowledge of hook event

func (*ACK) JSON

func (a *ACK) JSON() string


type ActionRequest

type ActionRequest struct {
	Action string `json:"action"`

ActionRequest defines for triggering job action like stop/cancel.

type Config

type Config struct {
	RedisPoolConfig *config.RedisPoolConfig `json:"redis_pool_config"`

Config job service config

type Context

type Context interface {
	// Build the context based on the parent context
	// A new job context will be generated based on the current context
	// for the provided job.
	// Returns:
	// new Context based on the parent one
	// error if meet any problems
	Build(tracker Tracker) (Context, error)

	// Get property from the context
	// prop string : key of the context property
	// Returns:
	//  The data of the specified context property if have
	//  bool to indicate if the property existing
	Get(prop string) (interface{}, bool)

	// SystemContext returns the system context
	// Returns:
	//  context.Context
	SystemContext() context.Context

	// Checkin is bridge func for reporting detailed status
	// status string : detailed status
	// Returns:
	//  error if meet any problems
	Checkin(status string) error

	// OPCommand return the control operational command like stop if have
	// Returns:
	//  op command if have
	//  flag to indicate if have command
	OPCommand() (OPCommand, bool)

	// GetLogger returns the logger
	GetLogger() logger.Interface

	// Tracker of job.
	Tracker() Tracker

Context is combination of BaseContext and other job specified resources. Context will be the real execution context for one job.

type ContextInitializer

type ContextInitializer func(ctx context.Context) (Context, error)

ContextInitializer is a func to initialize the concrete job context

type HookCallback

type HookCallback func(hookURL string, change *StatusChange) error

HookCallback defines a callback to trigger when hook events happened

type Interface

type Interface interface {
	// Declare how many times the job can be retried if failed.
	// Return:
	// uint: the failure count allowed. If it is set to 0, then default value 4 is used.
	MaxFails() uint

	// Max currency of the job. Unlike the WorkerPool concurrency, it controls the limit on the number jobs of that type
	// that can be active at one time by within a single redis instance.
	// The default value is 0, which means "no limit on job concurrency".
	MaxCurrency() uint

	// Tell the worker worker if retry the failed job when the fails is
	// still less that the number declared by the method 'MaxFails'.
	// Returns:
	//  true for retry and false for none-retry
	ShouldRetry() bool

	// Indicate whether the parameters of job are valid.
	// Return:
	// error if parameters are not valid. NOTES: If no parameters needed, directly return nil.
	Validate(params Parameters) error

	// Run the business logic here.
	// The related arguments will be injected by the workerpool.
	// ctx Context                   : Job execution context.
	// params map[string]interface{} : parameters with key-pair style for the job execution.
	// Returns:
	//  error if failed to run. NOTES: If job is stopped or cancelled, a specified error should be returned
	Run(ctx Context, params Parameters) error

Interface defines the related injection and run entry methods.

type Metadata

type Metadata struct {
	JobKind       string `json:"kind"`
	ScheduleDelay uint64 `json:"schedule_delay,omitempty"`
	Cron          string `json:"cron_spec,omitempty"`
	IsUnique      bool   `json:"unique"`

Metadata stores the metadata of job.

type OPCommand

type OPCommand string

OPCommand is the type of job operation commands

const (
	// StopCommand is const for stop command
	StopCommand OPCommand = "stop"
	// NilCommand is const for a nil command
	NilCommand OPCommand = "nil"

func (OPCommand) IsStop

func (oc OPCommand) IsStop() bool

IsStop return if the op command is stop

type Parameters

type Parameters map[string]interface{}

Parameters for job execution.

type PrioritySampler

type PrioritySampler interface {
	// Priority for the given job.
	// Job with high priority has the more probabilities to execute.
	// e.g.:
	//   always process X jobs before Y jobs if priorityX > priority Y
	// Arguments:
	//   job string: the job type
	// Returns:
	//   uint: the priority value (between 1 and 10000)
	For(job string) uint

PrioritySampler define the job priority generation method

func Priority

func Priority() PrioritySampler

Priority returns the default job priority sampler implementation.

type Request

type Request struct {
	Job *RequestBody `json:"job"`

Request is the request of launching a job.

type RequestBody

type RequestBody struct {
	Name       string     `json:"name"`
	Parameters Parameters `json:"parameters"`
	Metadata   *Metadata  `json:"metadata"`
	StatusHook string     `json:"status_hook"`

RequestBody keeps the basic info.

type SimpleStatusChange

type SimpleStatusChange struct {
	JobID        string `json:"job_id"`
	TargetStatus string `json:"target_status"`
	Revision     int64  `json:"revision"`

SimpleStatusChange only keeps job ID and the target status

type Stats

type Stats struct {
	Info *StatsInfo `json:"job"`

Stats keeps the result of job launching.

func (*Stats) Validate

func (st *Stats) Validate() error

Validate the job stats

type StatsInfo

type StatsInfo struct {
	JobID         string     `json:"id"`
	Status        string     `json:"status"`
	JobName       string     `json:"name"`
	JobKind       string     `json:"kind"`
	IsUnique      bool       `json:"unique"`
	RefLink       string     `json:"ref_link,omitempty"`
	CronSpec      string     `json:"cron_spec,omitempty"`
	EnqueueTime   int64      `json:"enqueue_time"`
	UpdateTime    int64      `json:"update_time"`
	RunAt         int64      `json:"run_at,omitempty"`
	CheckIn       string     `json:"check_in,omitempty"`
	CheckInAt     int64      `json:"check_in_at,omitempty"`
	DieAt         int64      `json:"die_at,omitempty"`
	WebHookURL    string     `json:"web_hook_url,omitempty"`
	UpstreamJobID string     `json:"upstream_job_id,omitempty"`   // Ref the upstream job if existing
	NumericPID    int64      `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job
	Parameters    Parameters `json:"parameters,omitempty"`
	Revision      int64      `json:"revision,omitempty"` // For differentiating the each retry of the same job
	HookAck       *ACK       `json:"ack,omitempty"`

StatsInfo keeps the stats of job

type Status

type Status string

Status of job

const (
	// PendingStatus   : job status pending
	PendingStatus Status = "Pending"
	// RunningStatus   : job status running
	RunningStatus Status = "Running"
	// StoppedStatus   : job status stopped
	StoppedStatus Status = "Stopped"
	// ErrorStatus     : job status error
	ErrorStatus Status = "Error"
	// SuccessStatus   : job status success
	SuccessStatus Status = "Success"
	// ScheduledStatus : job status scheduled
	ScheduledStatus Status = "Scheduled"

func (Status) After

func (s Status) After(another Status) bool

After return true if the status s is after another

func (Status) Before

func (s Status) Before(another Status) bool

Before return true if the status s is before another

func (Status) Code

func (s Status) Code() int

Code of job status

func (Status) Compare

func (s Status) Compare(another Status) int

Compare the two job status if < 0, s before another status if == 0, same status if > 0, s after another status Deprecated

func (Status) Equal

func (s Status) Equal(another Status) bool

Equal return true if the status s is the same of another

func (Status) Final

func (s Status) Final() bool

Final returns if the status is final status e.g: "Stopped", "Error" or "Success"

func (Status) String

func (s Status) String() string

String returns the raw string value of the status

func (Status) Validate

func (s Status) Validate() error

Validate the status If it's valid, then return nil error otherwise an non nil error is returned

type StatusChange

type StatusChange struct {
	JobID    string     `json:"job_id"`
	Status   string     `json:"status"`
	CheckIn  string     `json:"check_in,omitempty"`
	Metadata *StatsInfo `json:"metadata,omitempty"`

StatusChange is designed for reporting the status change via hook.

type Tracker

type Tracker interface {
	// Save the job stats which tracked by this tracker to the backend
	// Return:
	//   none nil error returned if any issues happened
	Save() error

	// Load the job stats which tracked by this tracker with the backend data
	// Return:
	//   none nil error returned if any issues happened
	Load() error

	// Get the job stats which tracked by this tracker
	// Returns:
	//  *models.Info : job stats data
	Job() *Stats

	// Update the properties of the job stats
	// fieldAndValues ...interface{} : One or more properties being updated
	// Returns:
	//  error if update failed
	Update(fieldAndValues ...interface{}) error

	// NumericID returns the numeric ID of periodic job.
	// Please pay attention, this only for periodic job.
	NumericID() (int64, error)

	// Mark the periodic job execution to done by update the score
	// of the relation between its periodic policy and execution to -1.
	PeriodicExecutionDone() error

	// Check in message
	CheckIn(message string) error

	// Update status with retry enabled
	UpdateStatusWithRetry(targetStatus Status) error

	// The current status of job
	Status() (Status, error)

	// Switch status to running
	Run() error

	// Switch status to stopped
	Stop() error

	// Switch the status to error
	Fail() error

	// Switch the status to success
	Succeed() error

	// Reset the status to `pending`
	Reset() error

	// Fire status hook to report the current status
	FireHook() error

Tracker is designed to track the life cycle of the job described by the stats The status change is linear and then has strict preorder and successor Check should be enforced before switching

Pending is default status when creating job, so no need to switch

func NewBasicTrackerWithID

func NewBasicTrackerWithID(
	ctx context.Context,
	jobID string,
	ns string,
	pool *redis.Pool,
	callback HookCallback,
	retryList *list.SyncList,
) Tracker

NewBasicTrackerWithID builds a tracker with the provided job ID

func NewBasicTrackerWithStats

func NewBasicTrackerWithStats(
	ctx context.Context,
	stats *Stats,
	ns string,
	pool *redis.Pool,
	callback HookCallback,
	retryList *list.SyncList,
) Tracker

NewBasicTrackerWithStats builds a tracker with the provided job stats


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL