Documentation
¶
Index ¶
- Variables
- type Logger
- type Metric
- type Option
- type OptionFunc
- type Options
- type Queue
- func (q *Queue) BusyWorkers() int64
- func (q *Queue) CompletedTasks() uint64
- func (q *Queue) FailureTasks() uint64
- func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error
- func (q *Queue) QueueTask(task job.TaskFunc, opts ...job.AllowOption) error
- func (q *Queue) Release()
- func (q *Queue) Shutdown()
- func (q *Queue) Start()
- func (q *Queue) SubmittedTasks() uint64
- func (q *Queue) SuccessTasks() uint64
- func (q *Queue) UpdateWorkerCount(num int64)
- func (q *Queue) Wait()
- type Ring
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNoTaskInQueue there is nothing in the queue ErrNoTaskInQueue = errors.New("golang-queue: no task in queue") // ErrQueueHasBeenClosed the current queue is closed ErrQueueHasBeenClosed = errors.New("golang-queue: queue has been closed") // ErrMaxCapacity Maximum size limit reached ErrMaxCapacity = errors.New("golang-queue: maximum size limit reached") )
var ErrMissingWorker = errors.New("missing worker module")
ErrMissingWorker missing define worker
var ErrQueueShutdown = errors.New("queue has been closed and released")
ErrQueueShutdown the queue is released and closed.
Functions ¶
This section is empty.
Types ¶
type Logger ¶
type Logger interface {
Infof(format string, args ...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Info(args ...interface{})
Error(args ...interface{})
Fatal(args ...interface{})
}
Logger interface is used throughout gorush
func NewEmptyLogger ¶
func NewEmptyLogger() Logger
NewEmptyLogger for simple logger.
Example ¶
l := NewEmptyLogger()
l.Info("test")
l.Infof("test")
l.Error("test")
l.Errorf("test")
l.Fatal("test")
l.Fatalf("test")
type Metric ¶
type Metric interface {
IncBusyWorker()
DecBusyWorker()
BusyWorkers() int64
SuccessTasks() uint64
FailureTasks() uint64
SubmittedTasks() uint64
CompletedTasks() uint64
IncSuccessTask()
IncFailureTask()
IncSubmittedTask()
}
Metric interface
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
An Option configures a mutex.
func WithAfterFn ¶
func WithAfterFn(afterFn func()) Option
WithAfterFn set callback function after job done
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
Options for custom args in Queue
func NewOptions ¶
NewOptions initialize the default value for the options
type Queue ¶
A Queue is a message queue.
func NewPool ¶
NewPool initializes a new pool
Example (QueueTask) ¶
taskN := 7
rets := make(chan int, taskN)
// allocate a pool with 5 goroutines to deal with those tasks
p := queue.NewPool(5)
// don't forget to release the pool in the end
defer p.Release()
// assign tasks to asynchronous goroutine pool
for i := 0; i < taskN; i++ {
idx := i
if err := p.QueueTask(func(context.Context) error {
// sleep and return the index
time.Sleep(20 * time.Millisecond)
rets <- idx
return nil
}); err != nil {
log.Println(err)
}
}
// wait until all tasks done
for i := 0; i < taskN; i++ {
fmt.Println("index:", <-rets)
}
Output: index: 3 index: 0 index: 2 index: 4 index: 5 index: 6 index: 1
Example (QueueTaskTimeout) ¶
taskN := 7
rets := make(chan int, taskN)
resps := make(chan error, 1)
// allocate a pool with 5 goroutines to deal with those tasks
q := queue.NewPool(5)
// don't forget to release the pool in the end
defer q.Release()
// assign tasks to asynchronous goroutine pool
for i := 0; i < taskN; i++ {
idx := i
if err := q.QueueTask(func(ctx context.Context) error {
// panic job
if idx == 5 {
panic("system error")
}
// timeout job
if idx == 6 {
time.Sleep(105 * time.Millisecond)
}
select {
case <-ctx.Done():
resps <- ctx.Err()
default:
}
rets <- idx
return nil
}, job.AllowOption{
Timeout: job.Time(100 * time.Millisecond),
}); err != nil {
log.Println(err)
}
}
// wait until all tasks done
for i := 0; i < taskN-1; i++ {
fmt.Println("index:", <-rets)
}
close(resps)
for e := range resps {
fmt.Println(e.Error())
}
fmt.Println("success task count:", q.SuccessTasks())
fmt.Println("failure task count:", q.FailureTasks())
fmt.Println("submitted task count:", q.SubmittedTasks())
Output: index: 3 index: 0 index: 2 index: 4 index: 6 index: 1 context deadline exceeded success task count: 5 failure task count: 2 submitted task count: 7
func (*Queue) BusyWorkers ¶
BusyWorkers returns the numbers of workers in the running process.
func (*Queue) CompletedTasks ¶
CompletedTasks returns the numbers of completed tasks.
func (*Queue) FailureTasks ¶
BusyWorkers returns the numbers of failure tasks.
func (*Queue) Queue ¶
func (q *Queue) Queue(message core.QueuedMessage, opts ...job.AllowOption) error
Queue to queue single job with binary
func (*Queue) SubmittedTasks ¶
BusyWorkers returns the numbers of submitted tasks.
func (*Queue) SuccessTasks ¶
BusyWorkers returns the numbers of success tasks.
func (*Queue) UpdateWorkerCount ¶
UpdateWorkerCount to update worker number dynamically.
type Ring ¶
Ring represents a simple queue using a buffer channel.
func NewRing ¶
NewRing creates a new Ring instance with the provided options. It initializes the task queue with a default size of 2, sets the capacity based on the provided options, and configures the logger and run function. The function returns a pointer to the newly created Ring instance.
Parameters:
opts - A variadic list of Option functions to configure the Ring instance.
Returns:
*Ring - A pointer to the newly created Ring instance.
func (*Ring) Queue ¶
func (s *Ring) Queue(task core.TaskMessage) error
Queue adds a task to the ring buffer queue. It returns an error if the queue is shut down or has reached its maximum capacity.
func (*Ring) Request ¶
func (s *Ring) Request() (core.TaskMessage, error)
Request retrieves the next task message from the ring queue. If the queue has been stopped and is empty, it signals the exit channel and returns an error indicating the queue has been closed. If the queue is empty but not stopped, it returns an error indicating there are no tasks in the queue. If a task is successfully retrieved, it is removed from the queue, and the queue may be resized if it is less than half full. Returns the task message and nil on success, or an error if the queue is empty or has been closed.
func (*Ring) Run ¶
Run executes a new task using the provided context and task message. It calls the runFunc function, which is responsible for processing the task. The context allows for cancellation and timeout control of the task execution.
func (*Ring) Shutdown ¶
Shutdown gracefully shuts down the worker. It sets the stopFlag to indicate that the queue is shutting down and prevents new tasks from being added. If the queue is already shut down, it returns ErrQueueShutdown. It waits for all tasks to be processed before completing the shutdown.