job

package
v0.0.0-...-a6be2c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2020 License: GPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetLogger

func SetLogger(logger *zap.Logger)

Types

type DefaultStorage

type DefaultStorage struct {
}

func (*DefaultStorage) LockJob

func (s *DefaultStorage) LockJob(j *Job) error

func (*DefaultStorage) UpdateJob

func (s *DefaultStorage) UpdateJob(j *Job) error

type Job

type Job struct {
	ID       JobID
	Name     string // A name of a job supplied when registering it at worker
	WorkerID string // A worker where this job is running

	ArrivedAt  time.Time `msgpack:"-"` // Time when a message is received from a Tasks broker
	LockedAt   time.Time `msgpack:"-"` // Time when this Job is locked for execution by a worker
	StartedAt  time.Time `msgpack:"-"` // Time when this Job is actually run
	FinishedAt time.Time `msgpack:"-"` // Time when this Job is marked as finished

	// queue string
	// payload Payload
	Args []interface{} // `json:"args"`
	// contains filtered or unexported fields
}

func ReadJob

func ReadJob(rec []interface{}) *Job

func (*Job) Run

func (j *Job) Run()

Run executes a user-defined function - the task itself. This function is blocking. This function should never crash no matter what user executes.

func (*Job) SetTask

func (j *Job) SetTask(f JobFunc)

type JobFunc

type JobFunc func(*Job, ...interface{}) error

type JobID

type JobID = string

type JobID uuid.UUID

func NewJobID

func NewJobID() JobID

type Payload

type Payload struct {
	Class string        `json:"class"`
	Args  []interface{} `json:"args"`
}

type Queue

type Queue struct {
	Name        string
	Concurrency int
	Pool        chan *Job
	Stopped     bool
}

func (*Queue) AddJob

func (q *Queue) AddJob(j *Job)

AddJob sends a job into a queue's "Jobs" channel If channel is not full - the job will start immediately

func (*Queue) Start

func (queue *Queue) Start()

Start creates "queue workers". It is a blocking function. It waits for all of the "workers" to finish.

func (*Queue) StopQueue

func (queue *Queue) StopQueue()

type TarantoolStorage

type TarantoolStorage struct {
	// contains filtered or unexported fields
}

func (*TarantoolStorage) Init

func (s *TarantoolStorage) Init(uri string) error

Init prepares (creates) a "table" for tasks.

In Tarantool a table is called Space.

func (*TarantoolStorage) LockJob

func (s *TarantoolStorage) LockJob(j *Job) error

LockJob "requests a permission" for a worker to run the job.

It must insert a record into a consistent storage and if it succeeds - then a worker can start a job.

If it fails - then this worker MUST NOT start the job.

This function is used when a worker receives a new message from TasksBroker and decides wether or not it can start this job.

func (*TarantoolStorage) RecentJobs

func (s *TarantoolStorage) RecentJobs() ([]*Job, error)

func (*TarantoolStorage) UpdateJob

func (s *TarantoolStorage) UpdateJob(j *Job) error

type TasksStorage

type TasksStorage interface {
	// LockJob "requests a permission" for a worker to run the job.
	//
	// It must insert a record into a consistent storage
	// and if it succeeds - then a worker can start a job.
	//
	// If it fails - then this worker MUST NOT start the job.
	//
	// This function is used when a worker receives a new message from
	// TasksBroker and decides wether or not it can start this job.
	LockJob(j *Job) error

	UpdateJob(j *Job) error
}
var Storage TasksStorage

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL