background

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2021 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	JobConditionalErr       = errors.New("")
	JobNotFound             = fmt.Errorf("Job not found%w", JobConditionalErr)
	JobNotActive            = fmt.Errorf("Job is not active%w", JobConditionalErr)
	JobNoStartTime          = fmt.Errorf("Job has no start time%w", JobConditionalErr)
	OneShotJobUsed          = fmt.Errorf("Job is one-shot and has already ran or been scheduled%w", JobConditionalErr)
	ExecutionAlreadyWaiting = fmt.Errorf("Job already has an execution waiting%w", JobConditionalErr)

	JobNotDue  = errors.New("Job not due")
	JobNotMine = errors.New("Job does not belong to this worker")

	JobTaskUnknown = func(taskName string) error {
		return errors.New(fmt.Sprintf("Job's task is unknown: %s", taskName))
	}

	UnknownJobStatus = func(status ExecutionStatus) error {
		return errors.New(fmt.Sprintf("Job has unknown status: %s", status))
	}
)

Functions

This section is empty.

Types

type Config

type Config struct {
	AppName string
	DBHost  string
	DBUser  string
	DBPass  string
	DBName  string
}

func BuildConfig

func BuildConfig(appName string) Config

func (Config) DBDsn

func (c Config) DBDsn() string

type Controller

type Controller struct {
	LoopSeconds float64
	// contains filtered or unexported fields
}

Controller is the job controller, which runs control loops

func NewController

func NewController(r *Repository) *Controller

NewController returns a new controller.

func (*Controller) Block

func (c *Controller) Block(seconds int)

Block optionally runs the control loop, while blocking.

func (*Controller) FinishTaskForJob

func (c *Controller) FinishTaskForJob(jobID uuid.UUID) error

finishTaskForJob records a job as finished.

func (*Controller) JobFinishingMiddleware

func (c *Controller) JobFinishingMiddleware(next bus.CommandHandler) bus.CommandHandler

JobFinishingMiddleware hooks into the bus's command execution stack and allows it to report to the controller about the jobs execution status when it passes through. Should be inserted ABOVE recovery middleware so that panics don't stop job status being reported

func (*Controller) RegisterQueueAction

func (c *Controller) RegisterQueueAction(qa queueAction)

registerQueueAction attaches the QA callback

func (*Controller) Run

func (c *Controller) Run(done chan bool)

Run asynchronously runs the control loop, receiving a done signal.

type ExecutionStatus

type ExecutionStatus string

ExecutionStatus is the current status of one Job Execution.

const (
	WAITING    ExecutionStatus = "waiting"
	PROCESSING ExecutionStatus = "processing"
	COMPLETE   ExecutionStatus = "complete"
	NONE       ExecutionStatus = "none"
)

type Job

type Job struct {
	ID        uuid.UUID
	Name      string
	Frequency int
	SystemJob bool `json:"system_job" db:"system_job"`
	Task      []byte
	UserID    uuid.UUID `json:"user_id" db:"user_id"`
	Worker    uuid.UUID
	Heartbeat time.Time
	Active    bool
	StartAt   time.Time `json:"start_at" db:"start_at"`

	Executions []JobExecution `db:"-"`

	CreatedAt time.Time `json:"created_at" db:"created_at"`
}

Job is a domain entity for a Job, delayed execution task.

func NewJob

func NewJob(name string, cmd bus.Command) Job

NewJob creates a job with the basic legal defaults.

func (*Job) Complete

func (j *Job) Complete(workerID uuid.UUID) error

Complete updates the job after it has finished

func (Job) IsDue

func (j Job) IsDue() bool

IsDue returns whether the job is due to be scheduled (aka queued)

func (Job) NextExecution

func (j Job) NextExecution() *JobExecution

NextExecution returns the next execution

func (Job) NextExecutionStatus

func (j Job) NextExecutionStatus() ExecutionStatus

NextExecutionStatus returns the status of the next (aka currently pending) execution, or NONE

func (*Job) ScheduleNextExecution

func (j *Job) ScheduleNextExecution() error

ScheduleNextExecution creates a the next execution

func (*Job) ScheduleNow

func (j *Job) ScheduleNow() error

ScheduleNow modifies the job after it's been scheduled/queued

type JobExecution

type JobExecution struct {
	ID     uuid.UUID
	JobID  uuid.UUID `json:"job_id" db:"job_id"`
	Status ExecutionStatus
	Next   time.Time

	Job Job `db:"-"`

	CreatedAt   time.Time `json:"created_at" db:"created_at"`
	ScheduledAt time.Time `json:"scheduled_at" db:"scheduled_at"`
	CompletedAt time.Time `json:"completed_at" db:"completed_at"`
}

JobExecution is a domain entity for one run of a Job.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository handles DB access to the Job aggregate

func NewRepository

func NewRepository(c Config, db *sqlx.DB) *Repository

NewRepository returns a job repository.

func (*Repository) All

func (r *Repository) All() ([]Job, error)

All returns all Job aggregates.

func (*Repository) AssembleInfrastructure

func (r *Repository) AssembleInfrastructure() error

AssembleInfrastructure creates the tables for the repository.

func (*Repository) Begin

func (r *Repository) Begin() (*sqlx.Tx, error)

Begin starts a transaction.

func (*Repository) ClaimFor

func (r *Repository) ClaimFor(workerID uuid.UUID) error

ClaimFor claims any unclaimed/abandoned jobs for a worker

func (*Repository) GetFor

func (r *Repository) GetFor(workerID uuid.UUID) ([]Job, error)

GetFor retrieves a worker's jobs.

func (*Repository) GetOne

func (r *Repository) GetOne(id uuid.UUID) (Job, error)

GetOne retrieves one Job aggregate.

func (*Repository) Heartbeat

func (r *Repository) Heartbeat(workerID uuid.UUID) error

Heartbeat updates the heartbeat for all of a worker's jobs.

func (*Repository) Store

func (r *Repository) Store(job Job) error

Store stores a job in the repository

func (*Repository) StoreAll

func (r *Repository) StoreAll(jobs []Job) error

StoreAll stores a slice of Job aggregates.

type Service

type Service struct {
	// contains filtered or unexported fields
}

func Build

func Build(c Config) *Service

func (*Service) AttachRouter

func (s *Service) AttachRouter(qa queueAction)

func (*Service) Controller

func (s *Service) Controller() *Controller

func (*Service) Delete

func (s *Service) Delete()

func (*Service) RegisterJob

func (s *Service) RegisterJob(j Job) error

func (*Service) Work

func (s *Service) Work(block bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL