Documentation ¶
Index ¶
- Constants
- Variables
- func ExecuteInParallel(q *Queue, fn func(interface{}))
- type Job
- func (job *Job) Cancel()
- func (job *Job) GetResult() interface{}
- func (job *Job) IsCancelled() bool
- func (job *Job) IsCancelling() bool
- func (job *Job) IsComplete() bool
- func (job *Job) IsFailed() bool
- func (job *Job) IsFinished() bool
- func (job *Job) IsNotComplete() bool
- func (job *Job) IsPending() bool
- func (job *Job) IsRunning() bool
- func (job *Job) SetResult(result interface{})
- type JobState
- type Queue
- func (q *Queue) Dispose() []interface{}
- func (q *Queue) Disposed() bool
- func (q *Queue) Empty() bool
- func (q *Queue) Get(number int64, items []interface{}) (int64, error)
- func (q *Queue) Len() int64
- func (q *Queue) Peek() (interface{}, error)
- func (q *Queue) Poll(number int64, items []interface{}, timeout time.Duration) (int64, error)
- func (q *Queue) Put(items ...interface{}) error
- func (q *Queue) PutOrUpdate(cmp func(interface{}, interface{}) bool, item interface{}) error
- type Runner
- func (s *Runner) AddNamedWorker(name string) (uint64, error)
- func (s *Runner) IsNamedWorkerBusy(worker string) bool
- func (s *Runner) RunCancelableTask(task func(context.Context)) (uint64, error)
- func (s *Runner) RunJob(desc string, task func() error) error
- func (s *Runner) RunJobWithNamedWorker(desc, worker string, task func() error) error
- func (s *Runner) RunJobWithNamedWorkerWithCB(desc, worker string, task func() error, cb func(*Job)) error
- func (s *Runner) RunTask(task func()) error
- func (s *Runner) Stop() error
- func (s *Runner) StopCancelableTask(id uint64) error
Constants ¶
const ( // Pending job is wait to running Pending = JobState(0) // Running job is running Running = JobState(1) // Cancelling job is cancelling Cancelling = JobState(2) // Cancelled job is cancelled Cancelled = JobState(3) // Finished job is complete Finished = JobState(4) // Failed job is failed when execute Failed = JobState(5) )
Variables ¶
var ( // ErrDisposed is returned when an operation is performed on a disposed // queue. ErrDisposed = errors.New(`queue: disposed`) // ErrTimeout is returned when an applicable queue operation times out. ErrTimeout = errors.New(`queue: poll timed out`) // ErrEmptyQueue is returned when an non-applicable queue operation was called // due to the queue's empty item state ErrEmptyQueue = errors.New(`queue: empty queue`) )
var ( // ErrJobCancelled error job cancelled ErrJobCancelled = errors.New("Job cancelled") )
Functions ¶
func ExecuteInParallel ¶
func ExecuteInParallel(q *Queue, fn func(interface{}))
ExecuteInParallel will (in parallel) call the provided function with each item in the queue until the queue is exhausted. When the queue is exhausted execution is complete and all goroutines will be killed. This means that the queue will be disposed so cannot be used again.
Types ¶
type Job ¶
Job is do for something with state
func (*Job) IsCancelled ¶
IsCancelled returns true if job state is Cancelled
func (*Job) IsCancelling ¶
IsCancelling returns true if job state is Cancelling
func (*Job) IsComplete ¶
IsComplete return true means the job is complete.
func (*Job) IsFinished ¶
IsFinished returns true if job state is Finished
func (*Job) IsNotComplete ¶
IsNotComplete return true means the job is not complete.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the struct responsible for tracking the state of the queue.
func (*Queue) Dispose ¶
func (q *Queue) Dispose() []interface{}
Dispose will dispose of this queue and returns the items disposed. Any subsequent calls to Get or Put will return an error.
func (*Queue) Disposed ¶
Disposed returns a bool indicating if this queue has had disposed called on it.
func (*Queue) Get ¶
Get retrieves items from the queue. If there are some items in the queue, get will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue.
func (*Queue) Peek ¶
Peek returns a the first item in the queue by value without modifying the queue.
func (*Queue) Poll ¶
Poll retrieves items from the queue. If there are some items in the queue, Poll will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue or the provided timeout is reached. A non-positive timeout will block until items are added. If a timeout occurs, ErrTimeout is returned.
func (*Queue) PutOrUpdate ¶
PutOrUpdate will add the specified item to the queue, update it if exists
type Runner ¶
Runner TODO
func (*Runner) AddNamedWorker ¶
AddNamedWorker add a named worker, the named worker has uniq queue, so jobs are linear execution
func (*Runner) IsNamedWorkerBusy ¶
IsNamedWorkerBusy returns true if named queue is not empty
func (*Runner) RunCancelableTask ¶
RunCancelableTask run a task that can be cancelled Example:
err := s.RunCancelableTask(func(ctx context.Context) { select { case <-ctx.Done(): // cancelled case <-time.After(time.Second): // do something } })
if err != nil { // hanle error return }
func (*Runner) RunJobWithNamedWorker ¶
RunJobWithNamedWorker run a job in a named worker
func (*Runner) RunJobWithNamedWorkerWithCB ¶
func (s *Runner) RunJobWithNamedWorkerWithCB(desc, worker string, task func() error, cb func(*Job)) error
RunJobWithNamedWorkerWithCB run a job in a named worker
func (*Runner) Stop ¶
Stop stop all task RunTask will failure with an error Wait complete for the tasks that already in execute Cancel the tasks that is not start
func (*Runner) StopCancelableTask ¶
StopCancelableTask stop cancelable spec task