worker

package module
v0.0.0-...-0b1ed72 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2024 License: MIT Imports: 26 Imported by: 29

README

Worker

Worker runs a single Job in the background, it can do so immediately or at a scheduled time.

Once registered with QOR Admin, Worker will provide a Workers section in the navigation tree, containing pages for listing and managing the following aspects of Workers:

  • All Jobs.
  • Running: Jobs that are currently running.
  • Scheduled: Jobs which have been scheduled to run at a time in the future.
  • Done: finished Jobs.
  • Errors: any errors output from any Workers that have been run.

The admin interface for a schedulable Job will have an additional Schedule Time input, with which administrators can set the scheduled date and time.

GoDoc

Usage

import "github.com/qor/worker"

func main() {
  // Define Worker
  Worker := worker.New()

  // Arguments used to run a job
  type sendNewsletterArgument struct {
    Subject      string
    Content      string `sql:"size:65532"`
    SendPassword string

    // If job's argument has `worker.Schedule` embedded, it will get run at a scheduled time
    worker.Schedule
  }

  // Register Job
  Worker.RegisterJob(&worker.Job{
    Name: "Send Newsletter", // Registerd Job Name
    Handler: func(argument interface{}, qorJob worker.QorJobInterface) error {
      // `AddLog` add job log
      qorJob.AddLog("Started sending newsletters...")
      qorJob.AddLog(fmt.Sprintf("Argument: %+v", argument.(*sendNewsletterArgument)))

      for i := 1; i <= 100; i++ {
        time.Sleep(100 * time.Millisecond)
        qorJob.AddLog(fmt.Sprintf("Sending newsletter %v...", i))
        // `SetProgress` set job progress percent, from 0 - 100
        qorJob.SetProgress(uint(i))
      }

      qorJob.AddLog("Finished send newsletters")
      return nil
    },
    // Arguments used to run a job
    Resource: Admin.NewResource(&sendNewsletterArgument{}),
  })

  // Add Worker to qor admin, so you could manage jobs in the admin interface
  Admin.AddResource(Worker)
}

Things to note

  • If a Job is scheduled within 2 minutes of the current time, then it will be run immediately.
  • It is possible, via the admin interface, to abort a currently running Job: view the Job's data via Workers > Running or Workers > All Jobs and press the Abort running Job button.
  • It is possible, via the admin interface, to abort a scheduled Job: view the Job's data via Workers > Scheduled or Workers > All Jobs and press the Cancel scheduled Job button.
  • It is possible, via the admin interface, to update a scheduled Job, including setting a new date and time: view the Job's data via Workers > Scheduled or Workers > All Jobs, update the Schedule Time field's value, and press the Update scheduled Job button. Please be aware that scheduling a Job to a date/time in the past will see the Job get run immediately.

Worker Demo: http://demo.getqor.com/admin/workers

License

Released under the MIT License.

Documentation

Index

Constants

View Source
const (
	// JobStatusScheduled job status scheduled
	JobStatusScheduled = "scheduled"
	// JobStatusCancelled job status cancelled
	JobStatusCancelled = "cancelled"
	// JobStatusNew job status new
	JobStatusNew = "new"
	// JobStatusRunning job status running
	JobStatusRunning = "running"
	// JobStatusDone job status done
	JobStatusDone = "done"
	// JobStatusException job status exception
	JobStatusException = "exception"
	// JobStatusKilled job status killed
	JobStatusKilled = "killed"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Queue Queue
	Job   QorJobInterface
	Admin *admin.Admin
}

Config worker config

type Cron

type Cron struct {
	Jobs     []*cronJob
	CronJobs []string
	// contains filtered or unexported fields
}

Cron implemented a worker Queue based on cronjob

func NewCronQueue

func NewCronQueue() *Cron

NewCronQueue initialize a Cron queue

func (*Cron) Add

func (cron *Cron) Add(job QorJobInterface) (err error)

Add a job to cron queue

func (*Cron) Kill

func (cron *Cron) Kill(job QorJobInterface) (err error)

Kill a job from cron queue

func (*Cron) Remove

func (cron *Cron) Remove(job QorJobInterface) error

Remove a job from cron queue

func (*Cron) Run

func (cron *Cron) Run(qorJob QorJobInterface) error

Run a job from cron queue

type Job

type Job struct {
	Name       string
	Group      string
	Handler    func(interface{}, QorJobInterface) error
	Permission *roles.Permission
	Queue      Queue
	Resource   *admin.Resource
	Worker     *Worker
}

Job is a struct that hold Qor Job definations

func (*Job) GetQueue

func (job *Job) GetQueue() Queue

GetQueue get defined job's queue

func (Job) HasPermission

func (job Job) HasPermission(mode roles.PermissionMode, context *qor.Context) bool

func (*Job) NewStruct

func (job *Job) NewStruct() interface{}

NewStruct initialize job struct

type QorJob

type QorJob struct {
	gorm.Model
	Status       string `sql:"default:'new'"`
	Progress     uint
	ProgressText string
	Log          string       `sql:"size:65532"`
	ResultsTable ResultsTable `sql:"size:65532"`

	// Add `valid:"-"“ to make the QorJob work well with qor/validations
	// When the qor/validations auto exec the validate struct callback we get error
	// runtime: goroutine stack exceeds 1000000000-byte limit
	// fatal error: stack overflow
	Job *Job `sql:"-" valid:"-"`

	audited.AuditedModel
	serializable_meta.SerializableMeta
	// contains filtered or unexported fields
}

QorJob predefined qor job struct, which will be used for Worker, if it doesn't include a job resource

func (*QorJob) AddLog

func (job *QorJob) AddLog(log string) error

AddLog add a log to qor job

func (*QorJob) AddResultsRow

func (job *QorJob) AddResultsRow(cells ...TableCell) error

AddResultsRow add a row of process results to a job

func (*QorJob) GetArgument

func (job *QorJob) GetArgument() interface{}

GetArgument get job's argument

func (*QorJob) GetJob

func (job *QorJob) GetJob() *Job

GetJob get predefined job for a qor job instance

func (*QorJob) GetJobID

func (job *QorJob) GetJobID() string

GetJobID get job's ID from a qor job

func (*QorJob) GetJobName

func (job *QorJob) GetJobName() string

GetJobName get job's name from a qor job

func (*QorJob) GetLogs

func (job *QorJob) GetLogs() []string

GetLogs get qor job's logs

func (*QorJob) GetProgress

func (job *QorJob) GetProgress() uint

GetProgress get qor job's progress

func (*QorJob) GetProgressText

func (job *QorJob) GetProgressText() string

GetProgressText get qor job's progress text

func (*QorJob) GetResultsTable

func (job *QorJob) GetResultsTable() ResultsTable

GetResultsTable get the job's process logs

func (*QorJob) GetSerializableArgumentResource

func (job *QorJob) GetSerializableArgumentResource() *admin.Resource

GetSerializableArgumentResource get job's argument's resource

func (*QorJob) GetStatus

func (job *QorJob) GetStatus() string

GetStatus get job's status from a qor job

func (*QorJob) SetJob

func (job *QorJob) SetJob(j *Job)

SetJob set `Job` for a qor job instance

func (*QorJob) SetProgress

func (job *QorJob) SetProgress(progress uint) error

SetProgress set qor job's progress

func (*QorJob) SetProgressText

func (job *QorJob) SetProgressText(str string) error

SetProgressText set qor job's progress text

func (*QorJob) SetStatus

func (job *QorJob) SetStatus(status string) error

SetStatus set job's status to a qor job instance

func (*QorJob) StartReferesh

func (job *QorJob) StartReferesh()

func (*QorJob) StopReferesh

func (job *QorJob) StopReferesh()

type QorJobInterface

type QorJobInterface interface {
	GetJobID() string
	GetJobName() string
	GetStatus() string
	SetStatus(string) error
	GetJob() *Job
	SetJob(*Job)

	GetProgress() uint
	SetProgress(uint) error
	GetProgressText() string
	SetProgressText(string) error
	GetLogs() []string
	AddLog(string) error
	GetResultsTable() ResultsTable
	AddResultsRow(...TableCell) error

	StartReferesh()
	StopReferesh()

	GetArgument() interface{}
	serializable_meta.SerializableMetaInterface
}

QorJobInterface is a interface, defined methods that needs for a qor job

type Queue

type Queue interface {
	Add(QorJobInterface) error
	Run(QorJobInterface) error
	Kill(QorJobInterface) error
	Remove(QorJobInterface) error
}

Queue is an interface defined methods need for a job queue

type ResultsTable

type ResultsTable struct {
	Name       string `json:"-"` // only used for generate string column in database
	TableCells [][]TableCell
}

ResultsTable is a struct, including importing/exporting results

func (*ResultsTable) Scan

func (resultsTable *ResultsTable) Scan(data interface{}) error

Scan used to scan value from database into itself

func (ResultsTable) Value

func (resultsTable ResultsTable) Value() (driver.Value, error)

Value used to read value from itself and save it into databae

type Schedule

type Schedule struct {
	ScheduleTime *time.Time
}

Schedule could be embedded as job argument, then the job will get run as scheduled feature

func (Schedule) GetScheduleTime

func (schedule Schedule) GetScheduleTime() *time.Time

GetScheduleTime get scheduled time

type Scheduler

type Scheduler interface {
	GetScheduleTime() *time.Time
}

Scheduler is a interface, for job used to `GetScheduleTime`

type TableCell

type TableCell struct {
	Value string
	Error string
}

TableCell including Value, Error for a data cell

type Worker

type Worker struct {
	*Config
	JobResource *admin.Resource
	Jobs        []*Job
	// contains filtered or unexported fields
}

Worker worker definition

func New

func New(config ...*Config) *Worker

New create Worker with Config

func (*Worker) AddJob

func (worker *Worker) AddJob(qorJob QorJobInterface) error

AddJob add job to worker

func (*Worker) ConfigureQorResource

func (worker *Worker) ConfigureQorResource(res resource.Resourcer)

ConfigureQorResource a method used to config Worker for qor admin

func (*Worker) ConfigureQorResourceBeforeInitialize

func (worker *Worker) ConfigureQorResourceBeforeInitialize(res resource.Resourcer)

ConfigureQorResourceBeforeInitialize a method used to config Worker for qor admin

func (*Worker) GetJob

func (worker *Worker) GetJob(jobID string) (QorJobInterface, error)

GetJob get job with id

func (*Worker) GetRegisteredJob

func (worker *Worker) GetRegisteredJob(name string) *Job

GetRegisteredJob register a job into Worker

func (*Worker) KillJob

func (worker *Worker) KillJob(jobID string) error

KillJob kill job with job id

func (*Worker) RegisterJob

func (worker *Worker) RegisterJob(job *Job) error

RegisterJob register a job into Worker

func (*Worker) RemoveJob

func (worker *Worker) RemoveJob(jobID string) error

RemoveJob remove job with job id

func (*Worker) RunJob

func (worker *Worker) RunJob(jobID string) error

RunJob run job with job id

func (*Worker) SetQueue

func (worker *Worker) SetQueue(queue Queue)

SetQueue set worker's queue

Directories

Path Synopsis
queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL